Adding mypy typing

This commit is contained in:
William Stearns
2024-06-15 23:22:43 -04:00
parent 53b0e35b0c
commit a29ee840f2
10 changed files with 180 additions and 157 deletions

View File

@@ -11,7 +11,7 @@ import threading
import time
import traceback
from queue import Queue
from typing import List, NoReturn, Union
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
@@ -25,23 +25,23 @@ from meshtastic.supported_device import supported_devices
from meshtastic.version import get_active_version
"""Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366])
blacklistVids: Dict = dict.fromkeys([0x1366])
def quoteBooleans(a_string):
def quoteBooleans(a_string: str) -> str:
"""Quote booleans
given a string that contains ": true", replace with ": 'true'" (or false)
"""
tmp = a_string.replace(": true", ": 'true'")
tmp: str = a_string.replace(": true", ": 'true'")
tmp = tmp.replace(": false", ": 'false'")
return tmp
def genPSK256():
def genPSK256() -> bytes:
"""Generate a random preshared key"""
return os.urandom(32)
def fromPSK(valstr):
def fromPSK(valstr: str) -> Any:
"""A special version of fromStr that assumes the user is trying to set a PSK.
In that case we also allow "none", "default" or "random" (to have python generate one), or simpleN
"""
@@ -58,7 +58,7 @@ def fromPSK(valstr):
return fromStr(valstr)
def fromStr(valstr):
def fromStr(valstr: str) -> Any:
"""Try to parse as int, float or bool (and fallback to a string as last resort)
Returns: an int, bool, float, str or byte array (for strings of hex digits)
@@ -66,6 +66,7 @@ def fromStr(valstr):
Args:
valstr (string): A user provided string
"""
val: Any
if len(valstr) == 0: # Treat an emptystring as an empty bytes
val = bytes()
elif valstr.startswith("0x"):
@@ -88,7 +89,7 @@ def fromStr(valstr):
return val
def pskToString(psk: bytes):
def pskToString(psk: bytes) -> str:
"""Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string"""
if len(psk) == 0:
return "unencrypted"
@@ -110,12 +111,12 @@ def stripnl(s) -> str:
return " ".join(s.split())
def fixme(message):
def fixme(message: str) -> None:
"""Raise an exception for things that needs to be fixed"""
raise Exception(f"FIXME: {message}") # pylint: disable=W0719
def catchAndIgnore(reason, closure):
def catchAndIgnore(reason: str, closure) -> None:
"""Call a closure but if it throws an exception print it and continue"""
try:
closure()
@@ -130,7 +131,7 @@ def findPorts(eliminate_duplicates: bool=False) -> List[str]:
Returns:
list -- a list of device paths
"""
l = list(
l: List = list(
map(
lambda port: port.device,
filter(
@@ -156,12 +157,12 @@ class dotdict(dict):
class Timeout:
"""Timeout class"""
def __init__(self, maxSecs: int=20):
def __init__(self, maxSecs: int=20) -> None:
self.expireTime: Union[int, float] = 0
self.sleepInterval: float = 0.1
self.expireTimeout: int = maxSecs
def reset(self):
def reset(self) -> None:
"""Restart the waitForSet timer"""
self.expireTime = time.time() + self.expireTimeout
@@ -220,7 +221,7 @@ class Timeout:
class Acknowledgment:
"A class that records which type of acknowledgment was just received, if any."
def __init__(self):
def __init__(self) -> None:
"""initialize"""
self.receivedAck = False
self.receivedNak = False
@@ -229,7 +230,7 @@ class Acknowledgment:
self.receivedTelemetry = False
self.receivedPosition = False
def reset(self):
def reset(self) -> None:
"""reset"""
self.receivedAck = False
self.receivedNak = False
@@ -242,17 +243,17 @@ class Acknowledgment:
class DeferredExecution:
"""A thread that accepts closures to run, and runs them as they are received"""
def __init__(self, name=None):
self.queue = Queue()
def __init__(self, name=None) -> None:
self.queue: Queue = Queue()
self.thread = threading.Thread(target=self._run, args=(), name=name)
self.thread.daemon = True
self.thread.start()
def queueWork(self, runnable):
def queueWork(self, runnable) -> None:
"""Queue up the work"""
self.queue.put(runnable)
def _run(self):
def _run(self) -> None:
while True:
try:
o = self.queue.get()
@@ -272,7 +273,7 @@ def our_exit(message, return_value=1) -> NoReturn:
sys.exit(return_value)
def support_info():
def support_info() -> None:
"""Print out info that helps troubleshooting of the cli."""
print("")
print("If having issues with meshtastic cli or python library")
@@ -301,7 +302,7 @@ def support_info():
print("Please add the output from the command: meshtastic --info")
def remove_keys_from_dict(keys, adict):
def remove_keys_from_dict(keys: Union[Tuple, List, Set], adict: Dict) -> Dict:
"""Return a dictionary without some keys in it.
Will removed nested keys.
"""
@@ -316,33 +317,33 @@ def remove_keys_from_dict(keys, adict):
return adict
def hexstr(barray):
def hexstr(barray: bytes) -> str:
"""Print a string of hex digits"""
return ":".join(f"{x:02x}" for x in barray)
def ipstr(barray):
def ipstr(barray: bytes) -> str:
"""Print a string of ip digits"""
return ".".join(f"{x}" for x in barray)
def readnet_u16(p, offset):
def readnet_u16(p, offset: int) -> int:
"""Read big endian u16 (network byte order)"""
return p[offset] * 256 + p[offset + 1]
def convert_mac_addr(val):
def convert_mac_addr(val: bytes) -> Union[str, bytes]:
"""Convert the base 64 encoded value to a mac address
val - base64 encoded value (ex: '/c0gFyhb'))
returns: a string formatted like a mac address (ex: 'fd:cd:20:17:28:5b')
"""
if not re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", val):
val_as_bytes = base64.b64decode(val)
if not re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", val): #FIXME - does the regex have to be bytes too to match val since val is bytes?
val_as_bytes: bytes = base64.b64decode(val)
return hexstr(val_as_bytes)
return val
def snake_to_camel(a_string):
def snake_to_camel(a_string: str) -> str:
"""convert snake_case to camelCase"""
# split underscore using split
temp = a_string.split("_")
@@ -351,16 +352,16 @@ def snake_to_camel(a_string):
return result
def camel_to_snake(a_string):
def camel_to_snake(a_string: str) -> str:
"""convert camelCase to snake_case"""
return "".join(["_" + i.lower() if i.isupper() else i for i in a_string]).lstrip(
"_"
)
def detect_supported_devices():
def detect_supported_devices() -> Set:
"""detect supported devices based on vendor id"""
system = platform.system()
system: str = platform.system()
# print(f'system:{system}')
possible_devices = set()
@@ -418,9 +419,9 @@ def detect_supported_devices():
return possible_devices
def detect_windows_needs_driver(sd, print_reason=False):
def detect_windows_needs_driver(sd, print_reason=False) -> bool:
"""detect if Windows user needs to install driver for a supported device"""
need_to_install_driver = False
need_to_install_driver: bool = False
if sd:
system = platform.system()
@@ -446,7 +447,7 @@ def detect_windows_needs_driver(sd, print_reason=False):
return need_to_install_driver
def eliminate_duplicate_port(ports):
def eliminate_duplicate_port(ports: List) -> List:
"""Sometimes we detect 2 serial ports, but we really only need to use one of the ports.
ports is a list of ports
@@ -479,9 +480,9 @@ def eliminate_duplicate_port(ports):
return new_ports
def is_windows11():
def is_windows11() -> bool:
"""Detect if Windows 11"""
is_win11 = False
is_win11: bool = False
if platform.system() == "Windows":
if float(platform.release()) >= 10.0:
patch = platform.version().split(".")[2]
@@ -495,7 +496,7 @@ def is_windows11():
return is_win11
def get_unique_vendor_ids():
def get_unique_vendor_ids() -> Set[str]:
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
@@ -504,7 +505,7 @@ def get_unique_vendor_ids():
return vids
def get_devices_with_vendor_id(vid):
def get_devices_with_vendor_id(vid: str) -> Set: #Set[SupportedDevice]
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
@@ -513,11 +514,11 @@ def get_devices_with_vendor_id(vid):
return sd
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
def active_ports_on_supported_devices(sds, eliminate_duplicates=False) -> Set[str]:
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
ports: Set = set()
baseports: Set = set()
system: str = platform.system()
# figure out what possible base ports there are
for d in sds:
@@ -575,13 +576,13 @@ def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
for com_port in com_ports:
ports.add(com_port)
if eliminate_duplicates:
ports = eliminate_duplicate_port(list(ports))
ports.sort()
ports = set(ports)
portlist: List = eliminate_duplicate_port(list(ports))
portlist.sort()
ports = set(portlist)
return ports
def detect_windows_port(sd):
def detect_windows_port(sd) -> Set[str]: #"sd" is a SupportedDevice from meshtastic.supported_device
"""detect if Windows port"""
ports = set()
@@ -606,11 +607,11 @@ def detect_windows_port(sd):
return ports
def check_if_newer_version():
def check_if_newer_version() -> Optional[str]:
"""Check pip to see if we are running the latest version."""
pypi_version = None
pypi_version: Optional[str] = None
try:
url = "https://pypi.org/pypi/meshtastic/json"
url: str = "https://pypi.org/pypi/meshtastic/json"
data = requests.get(url, timeout=5).json()
pypi_version = data["info"]["version"]
except Exception:
@@ -620,6 +621,10 @@ def check_if_newer_version():
try:
parsed_act_version = pkg_version.parse(act_version)
parsed_pypi_version = pkg_version.parse(pypi_version)
#Note: if handed "None" when we can't download the pypi_version,
#this gets a TypeError:
#"TypeError: expected string or bytes-like object, got 'NoneType'"
#Handle that below?
except pkg_version.InvalidVersion:
return pypi_version