Compare commits

...

41 Commits
2.5.3 ... 2.5.6

Author SHA1 Message Date
github-actions
01ffd83d64 bump version to 2.5.6 2024-12-20 20:42:21 +00:00
Ian McEwen
7c89e231bd Merge pull request #711 from ianmcorvidae/optionalize-deps
Make several dependencies optional (dotmap, print_color, and pyqrcode)
2024-12-12 21:19:15 -07:00
Ian McEwen
4673824236 Add the newly optional dependencies to an extras group 2024-12-12 21:14:58 -07:00
Ian McEwen
d87eddfd33 Make pyqrcode optional 2024-12-12 21:01:38 -07:00
Ian McEwen
31f322f1c2 Make dotmap (via meshtastic.test) and print_color optional 2024-12-12 20:59:22 -07:00
Ian McEwen
89b41c1a19 Remove pexpect too 2024-12-12 20:40:59 -07:00
Ian McEwen
1a5ca789c2 Remove pyparsing and webencodings dependencies, unsure why they're here at all 2024-12-12 19:48:29 -07:00
Ian McEwen
03ac322583 reset mypy-protobuf to non-optional, hopefully without breaking stuff 2024-11-26 14:35:48 -07:00
Ian McEwen
c63814358a prep 2.5.6a0 & protobufs 2024-11-26 14:31:33 -07:00
github-actions
663fabce74 bump version to 2.5.5 2024-11-26 21:22:11 +00:00
Ian McEwen
6243965044 Merge pull request #707 from meshtastic/request-localstats
Allow request-telemetry to elicit LocalStats response
2024-11-24 11:31:01 -07:00
Ben Meadors
b180b6fb15 Allow request-telemetry to illicit LocalStats response 2024-11-24 08:19:40 -06:00
Ian McEwen
4c7ac60be6 appease mypy too 2024-11-15 11:55:25 -07:00
Ian McEwen
0b086d10f8 appease pylint 2024-11-15 11:50:35 -07:00
Ian McEwen
426795fccd semi-experimentally, add fallback code for message_to_json, allowing older protobuf library versions 2024-11-15 11:47:17 -07:00
Ian McEwen
fb88ee114c bump protobufs to 2.5.5 2024-11-01 09:06:02 -07:00
github-actions
a4630b53eb bump version to 2.5.4 2024-11-01 16:03:00 +00:00
Ian McEwen
646aa981d5 update readme to link to docs again 2024-10-29 14:22:09 -07:00
Ian McEwen
9381acd6ac Merge pull request #681 from william-stearns/wls_add_types2
Wls add types2
2024-10-29 06:50:52 -07:00
Ian McEwen
384063db19 Fix some remaining mypy complaints 2024-10-29 06:47:16 -07:00
Ian McEwen
20d75d9023 Merge branch 'master' into wls_add_types2 2024-10-29 06:19:16 -07:00
Ian McEwen
0deb1d788f Merge pull request #702 from meshtastic/dependabot/pip/werkzeug-3.0.6
Bump werkzeug from 3.0.4 to 3.0.6
2024-10-28 20:36:24 -07:00
Ian McEwen
1070d9202b Merge pull request #698 from lachesis/lachesis/fix-remote-module-cfg-get
Add missing camel to snake conversion
2024-10-28 20:35:10 -07:00
Ian McEwen
b90de8b73b Merge pull request #701 from fmoessbauer/master
A lot of fixes around setting / retrieving base64 encoded values
2024-10-28 20:29:05 -07:00
dependabot[bot]
2e79ecf759 Bump werkzeug from 3.0.4 to 3.0.6
Bumps [werkzeug](https://github.com/pallets/werkzeug) from 3.0.4 to 3.0.6.
- [Release notes](https://github.com/pallets/werkzeug/releases)
- [Changelog](https://github.com/pallets/werkzeug/blob/main/CHANGES.rst)
- [Commits](https://github.com/pallets/werkzeug/compare/3.0.4...3.0.6)

---
updated-dependencies:
- dependency-name: werkzeug
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
2024-10-26 00:29:31 +00:00
Felix Moessbauer
578d3e4b24 do not double-print value when setting a repeated value
When clearning or appending to a repeated value, both the "Clearing..."
/ "Adding..." line and the "Set..." line were shown. However, this is
misleading as the only performed operation is the clearing / appending.

We fix this by directly returning from the function in case of clearing
/ appending.
2024-10-25 17:11:06 +02:00
Felix Moessbauer
4ca13bcede fix setting of list item on configure
When setting the whole configuration via --configure, list types like
adminKey need special handling. Previously this failed as a list cannot
be appended to a list. The new code adds dedicated logic to replace the
repeated value when passing a list. Also, all items of that list are
converted into the correct (typed) form before setting them.
2024-10-25 17:11:06 +02:00
Felix Moessbauer
6ceae7c72f setPref: pass typed value instead of string
By passing a typed value we can conserve the type information to later
use that to convert it back into the correct protobuf type. The type
conversion is now done inside setPref.
2024-10-25 17:11:06 +02:00
Felix Moessbauer
4c29d7dd0f refactor camel and snake naming in setPref
Same change as done in getPerf to have less branches (simplifies the
code).
2024-10-25 17:11:06 +02:00
Felix Moessbauer
839bbbcad2 config: correctly print byte and array types on get
When getting config values of type bytes or list (technically a protobuf
repeated container type), these were directly printed on the output.
However, the retrieved values could not be set by --set again, as the
format was different (e.g. python string representation of bytes vs.
base64 prefixed and encoded as expected by --set).

We fix this by adding a toStr utility function (similar to the fromStr)
function to convert byte types correctly to the base64 representation.
Further, we check if the type is repeated and apply this operation to
all values.
2024-10-25 17:11:06 +02:00
Felix Moessbauer
1abb9fb213 refactor getPref to use uniform printing logic
We add a local helper function to print a single setting. This is a
preparation to correctly print non-trivial types. The existing code
in getPref is ported over to use that function. By that, the output
of "wholeField" is changed slightly to always print the full path for
each setting (e.g. "security.serialEnabled" instead of
"security:\nserialEnabled"). This improves support for grepping on the
output.
2024-10-25 17:11:06 +02:00
Felix Moessbauer
7fcbbe9b80 refactor camel and snake naming in getPref
We have a lot of code duplication by checkin over and over again if
field should be named in camel or snake notation. We simplify this by
writing the choosen variant into a variable and just use that name
across the code.

No functional change.
2024-10-25 14:13:50 +02:00
Eric Swanson
073274cb00 Add missing camel to snake conversion 2024-10-21 22:06:43 -04:00
Ian McEwen
92a3986a8f Improve comments for pdoc 2024-10-21 16:39:47 -07:00
William Stearns
e335f12a3b attempts to fix mypy issues 2024-10-11 00:59:02 -04:00
William Stearns
0da405168f pylint cleanups 2024-10-10 23:49:20 -04:00
William Stearns
58d9039a04 another missing import 2024-10-10 16:49:06 -04:00
William Stearns
f77e788aa8 fix missing import 2024-10-10 16:33:07 -04:00
William Stearns
aba381fb54 Merge branch 'master' into wls_add_types 2024-10-08 23:28:25 -04:00
William Stearns
60de9dddb1 Remove references to BLEClient breaking CI checks 2024-07-09 19:54:01 -04:00
William Stearns
a29ee840f2 Adding mypy typing 2024-06-15 23:22:43 -04:00
24 changed files with 1211 additions and 1029 deletions

View File

@@ -16,7 +16,7 @@ Events are delivered using a publish-subscribe model, and you can subscribe to o
**[Getting Started Guide](https://meshtastic.org/docs/software/python/cli/installation)**
(Documentation/API Reference is currently offline)
**[API Documentation](https://python.meshtastic.org)**
## Call for Contributors

View File

@@ -80,7 +80,6 @@ from typing import *
import google.protobuf.json_format
import serial # type: ignore[import-untyped]
from dotmap import DotMap # type: ignore[import-untyped]
from google.protobuf.json_format import MessageToJson
from pubsub import pub # type: ignore[import-untyped]
from tabulate import tabulate
@@ -111,13 +110,13 @@ from . import (
LOCAL_ADDR = "^local"
"""A special ID that means the local node"""
BROADCAST_NUM = 0xFFFFFFFF
BROADCAST_NUM: int = 0xFFFFFFFF
"""if using 8 bit nodenums this will be shortened on the target"""
BROADCAST_ADDR = "^all"
"""A special ID that means broadcast"""
OUR_APP_VERSION = 20300
OUR_APP_VERSION: int = 20300
"""The numeric buildnumber (shared with android apps) specifying the
level of device code we are guaranteed to understand
@@ -134,7 +133,9 @@ class ResponseHandler(NamedTuple):
"""A pending response callback, waiting for a response to one of our messages"""
# requestId: int - used only as a key
#: a callable to call when a response is received
callback: Callable
#: Whether ACKs and NAKs should be passed to this handler
ackPermitted: bool = False
# FIXME, add timestamp and age out old requests
@@ -142,11 +143,11 @@ class ResponseHandler(NamedTuple):
class KnownProtocol(NamedTuple):
"""Used to automatically decode known protocol payloads"""
#: A descriptive name (e.g. "text", "user", "admin")
name: str
# portnum: int, now a key
# If set, will be called to prase as a protocol buffer
#: If set, will be called to parse as a protocol buffer
protobufFactory: Optional[Callable] = None
# If set, invoked as onReceive(interface, packet)
#: If set, invoked as onReceive(interface, packet)
onReceive: Optional[Callable] = None

View File

@@ -11,14 +11,23 @@ import os
import platform
import sys
import time
from typing import Optional
from typing import List, Optional
try:
import pyqrcode # type: ignore[import-untyped]
except ImportError as e:
pyqrcode = None
import pyqrcode # type: ignore[import-untyped]
import yaml
from google.protobuf.json_format import MessageToDict
from pubsub import pub # type: ignore[import-untyped]
import meshtastic.test
try:
import meshtastic.test
have_test = True
except ImportError as e:
have_test = False
import meshtastic.util
from meshtastic import BROADCAST_ADDR, mt_config, remote_hardware
from meshtastic.ble_interface import BLEInterface
@@ -42,7 +51,7 @@ except ImportError as e:
from meshtastic.protobuf import channel_pb2, config_pb2, portnums_pb2
from meshtastic.version import get_active_version
def onReceive(packet, interface):
def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
args = mt_config.args
try:
@@ -73,7 +82,7 @@ def onReceive(packet, interface):
print(f"Warning: There is no field {ex} in the packet.")
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
def onConnection(interface, topic=pub.AUTO_TOPIC) -> None: # pylint: disable=W0613
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
@@ -85,8 +94,16 @@ def checkChannel(interface: MeshInterface, channelIndex: int) -> bool:
return ch and ch.role != channel_pb2.Channel.Role.DISABLED
def getPref(node, comp_name):
def getPref(node, comp_name) -> bool:
"""Get a channel or preferences value"""
def _printSetting(config_type, uni_name, pref_value, repeated):
"""Pretty print the setting"""
if repeated:
pref_value = [meshtastic.util.toStr(v) for v in pref_value]
else:
pref_value = meshtastic.util.toStr(pref_value)
print(f"{str(config_type.name)}.{uni_name}: {str(pref_value)}")
logging.debug(f"{str(config_type.name)}.{uni_name}: {str(pref_value)}")
name = splitCompoundName(comp_name)
wholeField = name[0] == name[1] # We want the whole field
@@ -94,17 +111,18 @@ def getPref(node, comp_name):
camel_name = meshtastic.util.snake_to_camel(name[1])
# Note: protobufs has the keys in snake_case, so snake internally
snake_name = meshtastic.util.camel_to_snake(name[1])
uni_name = camel_name if mt_config.camel_case else snake_name
logging.debug(f"snake_name:{snake_name} camel_name:{camel_name}")
logging.debug(f"use camel:{mt_config.camel_case}")
# First validate the input
localConfig = node.localConfig
moduleConfig = node.moduleConfig
found = False
found: bool = False
for config in [localConfig, moduleConfig]:
objDesc = config.DESCRIPTOR
config_type = objDesc.fields_by_name.get(name[0])
pref = False
pref = "" #FIXME - is this correct to leave as an empty string if not found?
if config_type:
pref = config_type.message_type.fields_by_name.get(snake_name)
if pref or wholeField:
@@ -112,38 +130,26 @@ def getPref(node, comp_name):
break
if not found:
if mt_config.camel_case:
print(
f"{localConfig.__class__.__name__} and {moduleConfig.__class__.__name__} do not have an attribute {snake_name}."
)
else:
print(
f"{localConfig.__class__.__name__} and {moduleConfig.__class__.__name__} do not have attribute {snake_name}."
)
print(
f"{localConfig.__class__.__name__} and {moduleConfig.__class__.__name__} do not have attribute {uni_name}."
)
print("Choices are...")
printConfig(localConfig)
printConfig(moduleConfig)
return False
# Check if we need to request the config
if len(config.ListFields()) != 0:
if len(config.ListFields()) != 0 and not isinstance(pref, str): # if str, it's still the empty string, I think
# read the value
config_values = getattr(config, config_type.name)
if not wholeField:
pref_value = getattr(config_values, pref.name)
if mt_config.camel_case:
print(f"{str(config_type.name)}.{camel_name}: {str(pref_value)}")
logging.debug(
f"{str(config_type.name)}.{camel_name}: {str(pref_value)}"
)
else:
print(f"{str(config_type.name)}.{snake_name}: {str(pref_value)}")
logging.debug(
f"{str(config_type.name)}.{snake_name}: {str(pref_value)}"
)
repeated = pref.label == pref.LABEL_REPEATED
_printSetting(config_type, uni_name, pref_value, repeated)
else:
print(f"{str(config_type.name)}:\n{str(config_values)}")
logging.debug(f"{str(config_type.name)}: {str(config_values)}")
for field in config_values.ListFields():
repeated = field[0].label == field[0].LABEL_REPEATED
_printSetting(config_type, field[0].name, field[1], repeated)
else:
# Always show whole field for remote node
node.requestConfig(config_type)
@@ -151,16 +157,16 @@ def getPref(node, comp_name):
return True
def splitCompoundName(comp_name):
def splitCompoundName(comp_name: str) -> List[str]:
"""Split compound (dot separated) preference name into parts"""
name = comp_name.split(".")
name: List[str] = comp_name.split(".")
if len(name) < 2:
name[0] = comp_name
name.append(comp_name)
return name
def traverseConfig(config_root, config, interface_config):
def traverseConfig(config_root, config, interface_config) -> bool:
"""Iterate through current config level preferences and either traverse deeper if preference is a dict or set preference"""
snake_name = meshtastic.util.camel_to_snake(config_root)
for pref in config:
@@ -168,18 +174,19 @@ def traverseConfig(config_root, config, interface_config):
if isinstance(config[pref], dict):
traverseConfig(pref_name, config[pref], interface_config)
else:
setPref(interface_config, pref_name, str(config[pref]))
setPref(interface_config, pref_name, config[pref])
return True
def setPref(config, comp_name, valStr) -> bool:
def setPref(config, comp_name, raw_val) -> bool:
"""Set a channel or preferences value"""
name = splitCompoundName(comp_name)
snake_name = meshtastic.util.camel_to_snake(name[-1])
camel_name = meshtastic.util.snake_to_camel(name[-1])
uni_name = camel_name if mt_config.camel_case else snake_name
logging.debug(f"snake_name:{snake_name}")
logging.debug(f"camel_name:{camel_name}")
@@ -201,10 +208,13 @@ def setPref(config, comp_name, valStr) -> bool:
if (not pref) or (not config_type):
return False
val = meshtastic.util.fromStr(valStr)
logging.debug(f"valStr:{valStr} val:{val}")
if isinstance(raw_val, str):
val = meshtastic.util.fromStr(raw_val)
else:
val = raw_val
logging.debug(f"valStr:{raw_val} val:{val}")
if snake_name == "wifi_psk" and len(valStr) < 8:
if snake_name == "wifi_psk" and len(str(raw_val)) < 8:
print(f"Warning: network.wifi_psk must be 8 or more characters.")
return False
@@ -216,14 +226,9 @@ def setPref(config, comp_name, valStr) -> bool:
if e:
val = e.number
else:
if mt_config.camel_case:
print(
f"{name[0]}.{camel_name} does not have an enum called {val}, so you can not set it."
)
else:
print(
f"{name[0]}.{snake_name} does not have an enum called {val}, so you can not set it."
)
print(
f"{name[0]}.{uni_name} does not have an enum called {val}, so you can not set it."
)
print(f"Choices in sorted order are:")
names = []
for f in enumType.values:
@@ -244,7 +249,11 @@ def setPref(config, comp_name, valStr) -> bool:
except TypeError:
# The setter didn't like our arg type guess try again as a string
config_values = getattr(config_part, config_type.name)
setattr(config_values, pref.name, valStr)
setattr(config_values, pref.name, str(val))
elif type(val) == list:
new_vals = [meshtastic.util.fromStr(x) for x in val]
config_values = getattr(config, config_type.name)
getattr(config_values, pref.name)[:] = new_vals
else:
config_values = getattr(config, config_type.name)
if val == 0:
@@ -252,16 +261,14 @@ def setPref(config, comp_name, valStr) -> bool:
print(f"Clearing {pref.name} list")
del getattr(config_values, pref.name)[:]
else:
print(f"Adding '{val}' to the {pref.name} list")
print(f"Adding '{raw_val}' to the {pref.name} list")
cur_vals = [x for x in getattr(config_values, pref.name) if x not in [0, "", b""]]
cur_vals.append(val)
getattr(config_values, pref.name)[:] = cur_vals
return True
prefix = f"{'.'.join(name[0:-1])}." if config_type.message_type is not None else ""
if mt_config.camel_case:
print(f"Set {prefix}{camel_name} to {valStr}")
else:
print(f"Set {prefix}{snake_name} to {valStr}")
print(f"Set {prefix}{uni_name} to {raw_val}")
return True
@@ -481,6 +488,8 @@ def onConnected(interface):
"air_quality": "air_quality_metrics",
"airquality": "air_quality_metrics",
"power": "power_metrics",
"localstats": "local_stats",
"local_stats": "local_stats",
}
telemType = telemMap.get(args.request_telemetry, "device_metrics")
print(
@@ -891,8 +900,11 @@ def onConnected(interface):
else:
urldesc = "Primary channel URL"
print(f"{urldesc}: {url}")
qr = pyqrcode.create(url)
print(qr.terminal())
if pyqrcode is not None:
qr = pyqrcode.create(url)
print(qr.terminal())
else:
print("Install pyqrcode to view a QR code printed to terminal.")
log_set: Optional = None # type: ignore[annotation-unchecked]
# we need to keep a reference to the logset so it doesn't get GCed early
@@ -960,7 +972,7 @@ def onConnected(interface):
sys.exit(1)
def printConfig(config):
def printConfig(config) -> None:
"""print configuration"""
objDesc = config.DESCRIPTOR
for config_section in objDesc.fields:
@@ -977,12 +989,12 @@ def printConfig(config):
print(f" {temp_name}")
def onNode(node):
def onNode(node) -> None:
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
def subscribe() -> None:
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onReceive, "meshtastic.receive")
# pub.subscribe(onConnection, "meshtastic.connection")
@@ -993,7 +1005,7 @@ def subscribe():
# pub.subscribe(onNode, "meshtastic.node")
def export_config(interface):
def export_config(interface) -> str:
"""used in --export-config"""
configObj = {}
@@ -1025,7 +1037,7 @@ def export_config(interface):
if alt:
configObj["location"]["alt"] = alt
config = MessageToDict(interface.localNode.localConfig)
config = MessageToDict(interface.localNode.localConfig) #checkme - Used as a dictionary here and a string below
if config:
# Convert inner keys to correct snake/camelCase
prefs = {}
@@ -1044,7 +1056,7 @@ def export_config(interface):
for i in range(len(prefs[pref]['adminKey'])):
prefs[pref]['adminKey'][i] = 'base64:' + prefs[pref]['adminKey'][i]
if mt_config.camel_case:
configObj["config"] = config
configObj["config"] = config #Identical command here and 2 lines below?
else:
configObj["config"] = config
@@ -1060,10 +1072,11 @@ def export_config(interface):
else:
configObj["module_config"] = prefs
config = "# start of Meshtastic configure yaml\n"
config += yaml.dump(configObj)
print(config)
return config
config_txt = "# start of Meshtastic configure yaml\n" #checkme - "config" (now changed to config_out)
#was used as a string here and a Dictionary above
config_txt += yaml.dump(configObj)
print(config_txt)
return config_txt
def create_power_meter():
@@ -1142,11 +1155,14 @@ def common():
parser.print_help(sys.stderr)
meshtastic.util.our_exit("", 1)
elif args.test:
result = meshtastic.test.testAll()
if not result:
meshtastic.util.our_exit("Warning: Test was not successful.")
if not have_test:
meshtastic.util.our_exit("Test module could not be important. Ensure you have the 'dotmap' module installed.")
else:
meshtastic.util.our_exit("Test was a success.", 0)
result = meshtastic.test.testAll()
if not result:
meshtastic.util.our_exit("Warning: Test was not successful.")
else:
meshtastic.util.our_exit("Test was a success.", 0)
else:
if args.seriallog == "stdout":
logfile = sys.stdout

View File

@@ -5,6 +5,7 @@ import atexit
import logging
import struct
import time
import io
from threading import Thread
from typing import List, Optional
@@ -34,9 +35,9 @@ class BLEInterface(MeshInterface):
self,
address: Optional[str],
noProto: bool = False,
debugOut=None,
debugOut: Optional[io.TextIOWrapper]=None,
noNodes: bool = False,
):
) -> None:
MeshInterface.__init__(
self, debugOut=debugOut, noProto=noProto, noNodes=noNodes
)
@@ -82,7 +83,7 @@ class BLEInterface(MeshInterface):
# Note: the on disconnected callback will call our self.close which will make us nicely wait for threads to exit
self._exit_handler = atexit.register(self.client.disconnect)
def from_num_handler(self, _, b): # pylint: disable=C0116
def from_num_handler(self, _, b: bytes) -> None: # pylint: disable=C0116
"""Handle callbacks for fromnum notify.
Note: this method does not need to be async because it is just setting a bool.
"""
@@ -150,9 +151,12 @@ class BLEInterface(MeshInterface):
)
return addressed_devices[0]
def _sanitize_address(address): # pylint: disable=E0213
def _sanitize_address(self, address: Optional[str]) -> Optional[str]: # pylint: disable=E0213
"Standardize BLE address by removing extraneous characters and lowercasing."
return address.replace("-", "").replace("_", "").replace(":", "").lower()
if address is None:
return None
else:
return address.replace("-", "").replace("_", "").replace(":", "").lower()
def connect(self, address: Optional[str] = None) -> "BLEClient":
"Connect to a device by address."
@@ -164,12 +168,16 @@ class BLEInterface(MeshInterface):
client.discover()
return client
def _receiveFromRadioImpl(self):
def _receiveFromRadioImpl(self) -> None:
while self._want_receive:
if self.should_read:
self.should_read = False
retries = 0
retries: int = 0
while self._want_receive:
if self.client is None:
logging.debug(f"BLE client is None, shutting down")
self._want_receive = False
continue
try:
b = bytes(self.client.read_gatt_char(FROMRADIO_UUID))
except BleakDBusError as e:
@@ -194,8 +202,8 @@ class BLEInterface(MeshInterface):
else:
time.sleep(0.01)
def _sendToRadioImpl(self, toRadio):
b = toRadio.SerializeToString()
def _sendToRadioImpl(self, toRadio) -> None:
b: bytes = toRadio.SerializeToString()
if b and self.client: # we silently ignore writes while we are shutting down
logging.debug(f"TORADIO write: {b.hex()}")
try:
@@ -211,7 +219,7 @@ class BLEInterface(MeshInterface):
time.sleep(0.01)
self.should_read = True
def close(self):
def close(self) -> None:
try:
MeshInterface.close(self)
except Exception as e:
@@ -236,7 +244,7 @@ class BLEInterface(MeshInterface):
class BLEClient:
"""Client for managing connection to a BLE device"""
def __init__(self, address=None, **kwargs):
def __init__(self, address=None, **kwargs) -> None:
self._eventLoop = asyncio.new_event_loop()
self._eventThread = Thread(
target=self._run_event_loop, name="BLEClient", daemon=True

View File

@@ -15,7 +15,11 @@ from decimal import Decimal
from typing import Any, Callable, Dict, List, Optional, Union
import google.protobuf.json_format
import print_color # type: ignore[import-untyped]
try:
import print_color # type: ignore[import-untyped]
except ImportError as e:
print_color = None
from pubsub import pub # type: ignore[import-untyped]
from tabulate import tabulate
@@ -153,7 +157,7 @@ class MeshInterface: # pylint: disable=R0902
@staticmethod
def _printLogLine(line, interface):
"""Print a line of log output."""
if interface.debugOut == sys.stdout:
if print_color is not None and interface.debugOut == sys.stdout:
# this isn't quite correct (could cause false positives), but currently our formatting differs between different log representations
if "DEBUG" in line:
print_color.print(line, color="cyan", end=None)
@@ -617,6 +621,8 @@ class MeshInterface: # pylint: disable=R0902
r.air_quality_metrics.CopyFrom(telemetry_pb2.AirQualityMetrics())
elif telemetryType == "power_metrics":
r.power_metrics.CopyFrom(telemetry_pb2.PowerMetrics())
elif telemetryType == "local_stats":
r.local_stats.CopyFrom(telemetry_pb2.LocalStats())
else: # fall through to device metrics
if self.nodesByNum is not None:
node = self.nodesByNum.get(self.localNode.nodeNum)

View File

@@ -13,6 +13,8 @@ with rather more easily once the code is simplified by this change.
"""
from typing import Any, Optional
def reset():
"""
Restore the namespace to pristine condition.
@@ -33,5 +35,5 @@ args = None
parser = None
channel_index = None
logfile = None
tunnelInstance = None
tunnelInstance: Optional[Any] = None
camel_case = False

View File

@@ -121,7 +121,7 @@ class Node:
)
return
if config_values is not None:
raw_config = getattr(getattr(adminMessage['raw'], oneof), field)
raw_config = getattr(getattr(adminMessage['raw'], oneof), camel_to_snake(field))
config_values.CopyFrom(raw_config)
print(f"{str(camel_to_snake(field))}:\n{str(config_values)}")

View File

@@ -256,7 +256,7 @@ class TAKPacket(google.protobuf.message.Message):
detail: builtins.bytes
"""
Generic CoT detail XML
May be compressed / truncated by the sender
May be compressed / truncated by the sender (EUD)
"""
@property
def contact(self) -> global___Contact:

View File

File diff suppressed because one or more lines are too long

View File

@@ -1202,6 +1202,18 @@ class Config(google.protobuf.message.Message):
"""
Singapore 923mhz
"""
PH_433: Config.LoRaConfig._RegionCode.ValueType # 19
"""
Philippines 433mhz
"""
PH_868: Config.LoRaConfig._RegionCode.ValueType # 20
"""
Philippines 868mhz
"""
PH_915: Config.LoRaConfig._RegionCode.ValueType # 21
"""
Philippines 915mhz
"""
class RegionCode(_RegionCode, metaclass=_RegionCodeEnumTypeWrapper): ...
UNSET: Config.LoRaConfig.RegionCode.ValueType # 0
@@ -1280,6 +1292,18 @@ class Config(google.protobuf.message.Message):
"""
Singapore 923mhz
"""
PH_433: Config.LoRaConfig.RegionCode.ValueType # 19
"""
Philippines 433mhz
"""
PH_868: Config.LoRaConfig.RegionCode.ValueType # 20
"""
Philippines 868mhz
"""
PH_915: Config.LoRaConfig.RegionCode.ValueType # 21
"""
Philippines 915mhz
"""
class _ModemPreset:
ValueType = typing.NewType("ValueType", builtins.int)

View File

File diff suppressed because one or more lines are too long

View File

@@ -835,6 +835,9 @@ class ModuleConfig(google.protobuf.message.Message):
POWER_MEASUREMENT_ENABLED_FIELD_NUMBER: builtins.int
POWER_UPDATE_INTERVAL_FIELD_NUMBER: builtins.int
POWER_SCREEN_ENABLED_FIELD_NUMBER: builtins.int
HEALTH_MEASUREMENT_ENABLED_FIELD_NUMBER: builtins.int
HEALTH_UPDATE_INTERVAL_FIELD_NUMBER: builtins.int
HEALTH_SCREEN_ENABLED_FIELD_NUMBER: builtins.int
device_update_interval: builtins.int
"""
Interval in seconds of how often we should try to send our
@@ -870,18 +873,30 @@ class ModuleConfig(google.protobuf.message.Message):
"""
power_measurement_enabled: builtins.bool
"""
Interval in seconds of how often we should try to send our
air quality metrics to the mesh
Enable/disable Power metrics
"""
power_update_interval: builtins.int
"""
Interval in seconds of how often we should try to send our
air quality metrics to the mesh
power metrics to the mesh
"""
power_screen_enabled: builtins.bool
"""
Enable/Disable the power measurement module on-device display
"""
health_measurement_enabled: builtins.bool
"""
Preferences for the (Health) Telemetry Module
Enable/Disable the telemetry measurement module measurement collection
"""
health_update_interval: builtins.int
"""
Interval in seconds of how often we should try to send our
air quality metrics to the mesh
health metrics to the mesh
"""
health_screen_enabled: builtins.bool
"""
Enable/Disable the health telemetry module on-device display
"""
def __init__(
self,
@@ -896,8 +911,11 @@ class ModuleConfig(google.protobuf.message.Message):
power_measurement_enabled: builtins.bool = ...,
power_update_interval: builtins.int = ...,
power_screen_enabled: builtins.bool = ...,
health_measurement_enabled: builtins.bool = ...,
health_update_interval: builtins.int = ...,
health_screen_enabled: builtins.bool = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["air_quality_enabled", b"air_quality_enabled", "air_quality_interval", b"air_quality_interval", "device_update_interval", b"device_update_interval", "environment_display_fahrenheit", b"environment_display_fahrenheit", "environment_measurement_enabled", b"environment_measurement_enabled", "environment_screen_enabled", b"environment_screen_enabled", "environment_update_interval", b"environment_update_interval", "power_measurement_enabled", b"power_measurement_enabled", "power_screen_enabled", b"power_screen_enabled", "power_update_interval", b"power_update_interval"]) -> None: ...
def ClearField(self, field_name: typing.Literal["air_quality_enabled", b"air_quality_enabled", "air_quality_interval", b"air_quality_interval", "device_update_interval", b"device_update_interval", "environment_display_fahrenheit", b"environment_display_fahrenheit", "environment_measurement_enabled", b"environment_measurement_enabled", "environment_screen_enabled", b"environment_screen_enabled", "environment_update_interval", b"environment_update_interval", "health_measurement_enabled", b"health_measurement_enabled", "health_screen_enabled", b"health_screen_enabled", "health_update_interval", b"health_update_interval", "power_measurement_enabled", b"power_measurement_enabled", "power_screen_enabled", b"power_screen_enabled", "power_update_interval", b"power_update_interval"]) -> None: ...
@typing.final
class CannedMessageConfig(google.protobuf.message.Message):

View File

File diff suppressed because one or more lines are too long

View File

@@ -143,6 +143,14 @@ class _TelemetrySensorTypeEnumTypeWrapper(google.protobuf.internal.enum_type_wra
"""
Custom I2C sensor implementation based on https://github.com/meshtastic/i2c-sensor
"""
MAX30102: _TelemetrySensorType.ValueType # 30
"""
MAX30102 Pulse Oximeter and Heart-Rate Sensor
"""
MLX90614: _TelemetrySensorType.ValueType # 31
"""
MLX90614 non-contact IR temperature sensor.
"""
class TelemetrySensorType(_TelemetrySensorType, metaclass=_TelemetrySensorTypeEnumTypeWrapper):
"""
@@ -269,6 +277,14 @@ CUSTOM_SENSOR: TelemetrySensorType.ValueType # 29
"""
Custom I2C sensor implementation based on https://github.com/meshtastic/i2c-sensor
"""
MAX30102: TelemetrySensorType.ValueType # 30
"""
MAX30102 Pulse Oximeter and Heart-Rate Sensor
"""
MLX90614: TelemetrySensorType.ValueType # 31
"""
MLX90614 non-contact IR temperature sensor.
"""
global___TelemetrySensorType = TelemetrySensorType
@typing.final
@@ -677,6 +693,9 @@ class LocalStats(google.protobuf.message.Message):
NUM_PACKETS_RX_BAD_FIELD_NUMBER: builtins.int
NUM_ONLINE_NODES_FIELD_NUMBER: builtins.int
NUM_TOTAL_NODES_FIELD_NUMBER: builtins.int
NUM_RX_DUPE_FIELD_NUMBER: builtins.int
NUM_TX_RELAY_FIELD_NUMBER: builtins.int
NUM_TX_RELAY_CANCELED_FIELD_NUMBER: builtins.int
uptime_seconds: builtins.int
"""
How long the device has been running since the last reboot (in seconds)
@@ -695,7 +714,7 @@ class LocalStats(google.protobuf.message.Message):
"""
num_packets_rx: builtins.int
"""
Number of packets received good
Number of packets received (both good and bad)
"""
num_packets_rx_bad: builtins.int
"""
@@ -709,6 +728,20 @@ class LocalStats(google.protobuf.message.Message):
"""
Number of nodes total
"""
num_rx_dupe: builtins.int
"""
Number of received packets that were duplicates (due to multiple nodes relaying).
If this number is high, there are nodes in the mesh relaying packets when it's unnecessary, for example due to the ROUTER/REPEATER role.
"""
num_tx_relay: builtins.int
"""
Number of packets we transmitted that were a relay for others (not originating from ourselves).
"""
num_tx_relay_canceled: builtins.int
"""
Number of times we canceled a packet to be relayed, because someone else did it before us.
This will always be zero for ROUTERs/REPEATERs. If this number is high, some other node(s) is/are relaying faster than you.
"""
def __init__(
self,
*,
@@ -720,11 +753,55 @@ class LocalStats(google.protobuf.message.Message):
num_packets_rx_bad: builtins.int = ...,
num_online_nodes: builtins.int = ...,
num_total_nodes: builtins.int = ...,
num_rx_dupe: builtins.int = ...,
num_tx_relay: builtins.int = ...,
num_tx_relay_canceled: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing.Literal["air_util_tx", b"air_util_tx", "channel_utilization", b"channel_utilization", "num_online_nodes", b"num_online_nodes", "num_packets_rx", b"num_packets_rx", "num_packets_rx_bad", b"num_packets_rx_bad", "num_packets_tx", b"num_packets_tx", "num_total_nodes", b"num_total_nodes", "uptime_seconds", b"uptime_seconds"]) -> None: ...
def ClearField(self, field_name: typing.Literal["air_util_tx", b"air_util_tx", "channel_utilization", b"channel_utilization", "num_online_nodes", b"num_online_nodes", "num_packets_rx", b"num_packets_rx", "num_packets_rx_bad", b"num_packets_rx_bad", "num_packets_tx", b"num_packets_tx", "num_rx_dupe", b"num_rx_dupe", "num_total_nodes", b"num_total_nodes", "num_tx_relay", b"num_tx_relay", "num_tx_relay_canceled", b"num_tx_relay_canceled", "uptime_seconds", b"uptime_seconds"]) -> None: ...
global___LocalStats = LocalStats
@typing.final
class HealthMetrics(google.protobuf.message.Message):
"""
Health telemetry metrics
"""
DESCRIPTOR: google.protobuf.descriptor.Descriptor
HEART_BPM_FIELD_NUMBER: builtins.int
SPO2_FIELD_NUMBER: builtins.int
TEMPERATURE_FIELD_NUMBER: builtins.int
heart_bpm: builtins.int
"""
Heart rate (beats per minute)
"""
spO2: builtins.int
"""
SpO2 (blood oxygen saturation) level
"""
temperature: builtins.float
"""
Body temperature in degrees Celsius
"""
def __init__(
self,
*,
heart_bpm: builtins.int | None = ...,
spO2: builtins.int | None = ...,
temperature: builtins.float | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["_heart_bpm", b"_heart_bpm", "_spO2", b"_spO2", "_temperature", b"_temperature", "heart_bpm", b"heart_bpm", "spO2", b"spO2", "temperature", b"temperature"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["_heart_bpm", b"_heart_bpm", "_spO2", b"_spO2", "_temperature", b"_temperature", "heart_bpm", b"heart_bpm", "spO2", b"spO2", "temperature", b"temperature"]) -> None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing.Literal["_heart_bpm", b"_heart_bpm"]) -> typing.Literal["heart_bpm"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing.Literal["_spO2", b"_spO2"]) -> typing.Literal["spO2"] | None: ...
@typing.overload
def WhichOneof(self, oneof_group: typing.Literal["_temperature", b"_temperature"]) -> typing.Literal["temperature"] | None: ...
global___HealthMetrics = HealthMetrics
@typing.final
class Telemetry(google.protobuf.message.Message):
"""
@@ -739,6 +816,7 @@ class Telemetry(google.protobuf.message.Message):
AIR_QUALITY_METRICS_FIELD_NUMBER: builtins.int
POWER_METRICS_FIELD_NUMBER: builtins.int
LOCAL_STATS_FIELD_NUMBER: builtins.int
HEALTH_METRICS_FIELD_NUMBER: builtins.int
time: builtins.int
"""
Seconds since 1970 - or 0 for unknown/unset
@@ -773,6 +851,12 @@ class Telemetry(google.protobuf.message.Message):
Local device mesh statistics
"""
@property
def health_metrics(self) -> global___HealthMetrics:
"""
Health telemetry metrics
"""
def __init__(
self,
*,
@@ -782,10 +866,11 @@ class Telemetry(google.protobuf.message.Message):
air_quality_metrics: global___AirQualityMetrics | None = ...,
power_metrics: global___PowerMetrics | None = ...,
local_stats: global___LocalStats | None = ...,
health_metrics: global___HealthMetrics | None = ...,
) -> None: ...
def HasField(self, field_name: typing.Literal["air_quality_metrics", b"air_quality_metrics", "device_metrics", b"device_metrics", "environment_metrics", b"environment_metrics", "local_stats", b"local_stats", "power_metrics", b"power_metrics", "variant", b"variant"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["air_quality_metrics", b"air_quality_metrics", "device_metrics", b"device_metrics", "environment_metrics", b"environment_metrics", "local_stats", b"local_stats", "power_metrics", b"power_metrics", "time", b"time", "variant", b"variant"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["variant", b"variant"]) -> typing.Literal["device_metrics", "environment_metrics", "air_quality_metrics", "power_metrics", "local_stats"] | None: ...
def HasField(self, field_name: typing.Literal["air_quality_metrics", b"air_quality_metrics", "device_metrics", b"device_metrics", "environment_metrics", b"environment_metrics", "health_metrics", b"health_metrics", "local_stats", b"local_stats", "power_metrics", b"power_metrics", "variant", b"variant"]) -> builtins.bool: ...
def ClearField(self, field_name: typing.Literal["air_quality_metrics", b"air_quality_metrics", "device_metrics", b"device_metrics", "environment_metrics", b"environment_metrics", "health_metrics", b"health_metrics", "local_stats", b"local_stats", "power_metrics", b"power_metrics", "time", b"time", "variant", b"variant"]) -> None: ...
def WhichOneof(self, oneof_group: typing.Literal["variant", b"variant"]) -> typing.Literal["device_metrics", "environment_metrics", "air_quality_metrics", "power_metrics", "local_stats", "health_metrics"] | None: ...
global___Telemetry = Telemetry

View File

@@ -8,7 +8,7 @@ from meshtastic.protobuf import portnums_pb2, remote_hardware_pb2
from meshtastic.util import our_exit
def onGPIOreceive(packet, interface):
def onGPIOreceive(packet, interface) -> None:
"""Callback for received GPIO responses"""
logging.debug(f"packet:{packet} interface:{interface}")
gpioValue = 0
@@ -37,7 +37,7 @@ class RemoteHardwareClient:
code for how you can connect to your own custom meshtastic services
"""
def __init__(self, iface):
def __init__(self, iface) -> None:
"""
Constructor

View File

@@ -5,7 +5,7 @@ import logging
import platform
import time
from typing import Optional
from typing import List, Optional
import serial # type: ignore[import-untyped]
@@ -19,7 +19,7 @@ if platform.system() != "Windows":
class SerialInterface(StreamInterface):
"""Interface class for meshtastic devices over a serial link"""
def __init__(self, devPath: Optional[str]=None, debugOut=None, noProto=False, connectNow=True, noNodes: bool=False):
def __init__(self, devPath: Optional[str]=None, debugOut=None, noProto: bool=False, connectNow: bool=True, noNodes: bool=False) -> None:
"""Constructor, opens a connection to a specified serial port, or if unspecified try to
find one Meshtastic device by probing
@@ -32,13 +32,13 @@ class SerialInterface(StreamInterface):
self.devPath: Optional[str] = devPath
if self.devPath is None:
ports = meshtastic.util.findPorts(True)
ports: List[str] = meshtastic.util.findPorts(True)
logging.debug(f"ports:{ports}")
if len(ports) == 0:
print("No Serial Meshtastic device detected, attempting TCP connection on localhost.")
return
elif len(ports) > 1:
message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message: str = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message += f" Ports detected:{ports}"
meshtastic.util.our_exit(message)
else:
@@ -59,14 +59,14 @@ class SerialInterface(StreamInterface):
self.stream = serial.Serial(
self.devPath, 115200, exclusive=True, timeout=0.5, write_timeout=0
)
self.stream.flush()
self.stream.flush() # type: ignore[attr-defined]
time.sleep(0.1)
StreamInterface.__init__(
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes
)
def close(self):
def close(self) -> None:
"""Close a connection to the device"""
if self.stream: # Stream can be null if we were already closed
self.stream.flush() # FIXME: why are there these two flushes with 100ms sleeps? This shouldn't be necessary

View File

@@ -1,10 +1,13 @@
"""Stream Interface base class
"""
import io
import logging
import threading
import time
import traceback
from typing import Optional, cast
import serial # type: ignore[import-untyped]
from meshtastic.mesh_interface import MeshInterface
@@ -19,7 +22,7 @@ MAX_TO_FROM_RADIO_SIZE = 512
class StreamInterface(MeshInterface):
"""Interface class for meshtastic devices over a stream link (serial, TCP, etc)"""
def __init__(self, debugOut=None, noProto=False, connectNow=True, noNodes=False):
def __init__(self, debugOut: Optional[io.TextIOWrapper]=None, noProto: bool=False, connectNow: bool=True, noNodes: bool=False) -> None:
"""Constructor, opens a connection to self.stream
Keyword Arguments:
@@ -35,6 +38,7 @@ class StreamInterface(MeshInterface):
raise Exception( # pylint: disable=W0719
"StreamInterface is now abstract (to update existing code create SerialInterface instead)"
)
self.stream: Optional[serial.Serial] # only serial uses this, TCPInterface overrides the relevant methods instead
self._rxBuf = bytes() # empty
self._wantExit = False
@@ -52,7 +56,7 @@ class StreamInterface(MeshInterface):
if not noProto:
self.waitForConfig()
def connect(self):
def connect(self) -> None:
"""Connect to our radio
Normally this is called automatically by the constructor, but if you
@@ -63,7 +67,7 @@ class StreamInterface(MeshInterface):
# if the reading statemachine was parsing a bad packet make sure
# we write enough start bytes to force it to resync (we don't use START1
# because we want to ensure it is looking for START1)
p = bytearray([START2] * 32)
p: bytes = bytearray([START2] * 32)
self._writeBytes(p)
time.sleep(0.1) # wait 100ms to give device time to start running
@@ -74,7 +78,7 @@ class StreamInterface(MeshInterface):
if not self.noProto: # Wait for the db download if using the protocol
self._waitConnected()
def _disconnected(self):
def _disconnected(self) -> None:
"""We override the superclass implementation to close our port"""
MeshInterface._disconnected(self)
@@ -86,7 +90,7 @@ class StreamInterface(MeshInterface):
# pylint: disable=W0201
self.stream = None
def _writeBytes(self, b):
def _writeBytes(self, b: bytes) -> None:
"""Write an array of bytes to our stream and flush"""
if self.stream: # ignore writes when stream is closed
self.stream.write(b)
@@ -98,24 +102,24 @@ class StreamInterface(MeshInterface):
# we sleep here to give the TBeam a chance to work
time.sleep(0.1)
def _readBytes(self, length):
def _readBytes(self, length) -> Optional[bytes]:
"""Read an array of bytes from our stream"""
if self.stream:
return self.stream.read(length)
else:
return None
def _sendToRadioImpl(self, toRadio):
def _sendToRadioImpl(self, toRadio) -> None:
"""Send a ToRadio protobuf to the device"""
logging.debug(f"Sending: {stripnl(toRadio)}")
b = toRadio.SerializeToString()
bufLen = len(b)
b: bytes = toRadio.SerializeToString()
bufLen: int = len(b)
# We convert into a string, because the TCP code doesn't work with byte arrays
header = bytes([START1, START2, (bufLen >> 8) & 0xFF, bufLen & 0xFF])
logging.debug(f"sending header:{header} b:{b}")
header: bytes = bytes([START1, START2, (bufLen >> 8) & 0xFF, bufLen & 0xFF])
logging.debug(f"sending header:{header!r} b:{b!r}")
self._writeBytes(header + b)
def close(self):
def close(self) -> None:
"""Close a connection to the device"""
logging.debug("Closing stream")
MeshInterface.close(self)
@@ -142,7 +146,7 @@ class StreamInterface(MeshInterface):
else:
self.cur_log_line += utf
def __reader(self):
def __reader(self) -> None:
"""The reader thread that reads bytes from our stream"""
logging.debug("in __reader()")
empty = bytes()
@@ -150,13 +154,13 @@ class StreamInterface(MeshInterface):
try:
while not self._wantExit:
# logging.debug("reading character")
b = self._readBytes(1)
b: Optional[bytes] = self._readBytes(1)
# logging.debug("In reader loop")
# logging.debug(f"read returned {b}")
if len(b) > 0:
c = b[0]
if b is not None and len(cast(bytes, b)) > 0:
c: int = b[0]
# logging.debug(f'c:{c}')
ptr = len(self._rxBuf)
ptr: int = len(self._rxBuf)
# Assume we want to append this byte, fixme use bytearray instead
self._rxBuf = self._rxBuf + b

View File

@@ -3,7 +3,7 @@
# pylint: disable=R0917
import logging
import socket
from typing import Optional
from typing import Optional, cast
from meshtastic.stream_interface import StreamInterface
@@ -16,9 +16,9 @@ class TCPInterface(StreamInterface):
self,
hostname: str,
debugOut=None,
noProto=False,
connectNow=True,
portNumber=DEFAULT_TCP_PORT,
noProto: bool=False,
connectNow: bool=True,
portNumber: int=DEFAULT_TCP_PORT,
noNodes:bool=False,
):
"""Constructor, opens a connection to a specified IP address/hostname
@@ -29,14 +29,16 @@ class TCPInterface(StreamInterface):
self.stream = None
self.hostname = hostname
self.portNumber = portNumber
self.hostname: str = hostname
self.portNumber: int = portNumber
self.socket: Optional[socket.socket] = None
if connectNow:
logging.debug(f"Connecting to {hostname}") # type: ignore[str-bytes-safe]
server_address = (hostname, portNumber)
sock = socket.create_connection(server_address)
self.socket: Optional[socket.socket] = sock
server_address: tuple[str, int] = (hostname, portNumber)
sock: Optional[socket.socket] = socket.create_connection(server_address)
self.socket = sock
else:
self.socket = None
@@ -44,25 +46,26 @@ class TCPInterface(StreamInterface):
self, debugOut=debugOut, noProto=noProto, connectNow=connectNow, noNodes=noNodes
)
def _socket_shutdown(self):
def _socket_shutdown(self) -> None:
"""Shutdown the socket.
Note: Broke out this line so the exception could be unit tested.
"""
self.socket.shutdown(socket.SHUT_RDWR)
if self.socket: #mian: please check that this should be "if self.socket:"
cast(socket.socket, self.socket).shutdown(socket.SHUT_RDWR)
def myConnect(self):
def myConnect(self) -> None:
"""Connect to socket"""
server_address = (self.hostname, self.portNumber)
sock = socket.create_connection(server_address)
server_address: tuple[str, int] = (self.hostname, self.portNumber)
sock: Optional[socket.socket] = socket.create_connection(server_address)
self.socket = sock
def close(self):
def close(self) -> None:
"""Close a connection to the device"""
logging.debug("Closing TCP stream")
StreamInterface.close(self)
# Sometimes the socket read might be blocked in the reader thread.
# Therefore we force the shutdown by closing the socket here
self._wantExit = True
self._wantExit: bool = True
if not self.socket is None:
try:
self._socket_shutdown()
@@ -70,10 +73,14 @@ class TCPInterface(StreamInterface):
pass # Ignore errors in shutdown, because we might have a race with the server
self.socket.close()
def _writeBytes(self, b):
def _writeBytes(self, b: bytes) -> None:
"""Write an array of bytes to our stream and flush"""
self.socket.send(b)
if self.socket:
self.socket.send(b)
def _readBytes(self, length):
def _readBytes(self, length) -> Optional[bytes]:
"""Read an array of bytes from our stream"""
return self.socket.recv(length)
if self.socket:
return self.socket.recv(length)
else:
return None

View File

@@ -5,6 +5,9 @@ import logging
import sys
import time
import traceback
import io
from typing import List, Optional
from dotmap import DotMap # type: ignore[import-untyped]
from pubsub import pub # type: ignore[import-untyped]
@@ -15,19 +18,19 @@ from meshtastic.serial_interface import SerialInterface
from meshtastic.tcp_interface import TCPInterface
"""The interfaces we are using for our tests"""
interfaces = None
interfaces: List = []
"""A list of all packets we received while the current test was running"""
receivedPackets = None
receivedPackets: Optional[List] = None
testsRunning = False
testsRunning: bool = False
testNumber = 0
testNumber: int = 0
sendingInterface = None
def onReceive(packet, interface):
def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
if sendingInterface == interface:
pass
@@ -42,20 +45,20 @@ def onReceive(packet, interface):
receivedPackets.append(p)
def onNode(node):
def onNode(node) -> None:
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe():
def subscribe() -> None:
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onNode, "meshtastic.node")
def testSend(
fromInterface, toInterface, isBroadcast=False, asBinary=False, wantAck=False
):
fromInterface, toInterface, isBroadcast: bool=False, asBinary: bool=False, wantAck: bool=False
) -> bool:
"""
Sends one test packet between two nodes and then returns success or failure
@@ -93,16 +96,16 @@ def testSend(
return False # Failed to send
def runTests(numTests=50, wantAck=False, maxFailures=0):
def runTests(numTests: int=50, wantAck: bool=False, maxFailures: int=0) -> bool:
"""Run the tests."""
logging.info(f"Running {numTests} tests with wantAck={wantAck}")
numFail = 0
numSuccess = 0
numFail: int = 0
numSuccess: int = 0
for _ in range(numTests):
# pylint: disable=W0603
global testNumber
testNumber = testNumber + 1
isBroadcast = True
isBroadcast:bool = True
# asBinary=(i % 2 == 0)
success = testSend(
interfaces[0], interfaces[1], isBroadcast, asBinary=False, wantAck=wantAck
@@ -126,10 +129,10 @@ def runTests(numTests=50, wantAck=False, maxFailures=0):
return True
def testThread(numTests=50):
def testThread(numTests=50) -> bool:
"""Test thread"""
logging.info("Found devices, starting tests...")
result = runTests(numTests, wantAck=True)
result: bool = runTests(numTests, wantAck=True)
if result:
# Run another test
# Allow a few dropped packets
@@ -137,25 +140,25 @@ def testThread(numTests=50):
return result
def onConnection(topic=pub.AUTO_TOPIC):
def onConnection(topic=pub.AUTO_TOPIC) -> None:
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def openDebugLog(portName):
def openDebugLog(portName) -> io.TextIOWrapper:
"""Open the debug log file"""
debugname = "log" + portName.replace("/", "_")
logging.info(f"Writing serial debugging to {debugname}")
return open(debugname, "w+", buffering=1, encoding="utf8")
def testAll(numTests=5):
def testAll(numTests: int=5) -> bool:
"""
Run a series of tests using devices we can find.
This is called from the cli with the "--test" option.
"""
ports = meshtastic.util.findPorts(True)
ports: List[str] = meshtastic.util.findPorts(True)
if len(ports) < 2:
meshtastic.util.our_exit(
"Warning: Must have at least two devices connected to USB."
@@ -175,7 +178,7 @@ def testAll(numTests=5):
)
logging.info("Ports opened, starting test")
result = testThread(numTests)
result: bool = testThread(numTests)
for i in interfaces:
i.close()
@@ -183,7 +186,7 @@ def testAll(numTests=5):
return result
def testSimulator():
def testSimulator() -> None:
"""
Assume that someone has launched meshtastic-native as a simulated node.
Talk to that node over TCP, do some operations and if they are successful
@@ -195,7 +198,7 @@ def testSimulator():
logging.basicConfig(level=logging.DEBUG)
logging.info("Connecting to simulator on localhost!")
try:
iface = TCPInterface("localhost")
iface: meshtastic.tcp_interface.TCPInterface = TCPInterface("localhost")
iface.showInfo()
iface.localNode.showInfo()
iface.localNode.exitSimulator()

View File

@@ -43,7 +43,7 @@ class Tunnel:
self.message = message
super().__init__(self.message)
def __init__(self, iface, subnet="10.115", netmask="255.255.0.0"):
def __init__(self, iface, subnet: str="10.115", netmask: str="255.255.0.0") -> None:
"""
Constructor

View File

@@ -11,7 +11,7 @@ import threading
import time
import traceback
from queue import Queue
from typing import List, NoReturn, Union
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message
@@ -31,7 +31,7 @@ from meshtastic.version import get_active_version
0925 Lakeview Research Saleae Logic (logic analyzer)
04b4:602a Cypress Semiconductor Corp. Hantek DSO-6022BL (oscilloscope)
"""
blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4])
blacklistVids: Dict = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4])
"""Some devices are highly likely to be meshtastic.
0x239a RAK4631
@@ -39,21 +39,21 @@ blacklistVids = dict.fromkeys([0x1366, 0x0483, 0x1915, 0x0925, 0x04b4])
whitelistVids = dict.fromkeys([0x239a, 0x303a])
def quoteBooleans(a_string):
def quoteBooleans(a_string: str) -> str:
"""Quote booleans
given a string that contains ": true", replace with ": 'true'" (or false)
"""
tmp = a_string.replace(": true", ": 'true'")
tmp: str = a_string.replace(": true", ": 'true'")
tmp = tmp.replace(": false", ": 'false'")
return tmp
def genPSK256():
def genPSK256() -> bytes:
"""Generate a random preshared key"""
return os.urandom(32)
def fromPSK(valstr):
def fromPSK(valstr: str) -> Any:
"""A special version of fromStr that assumes the user is trying to set a PSK.
In that case we also allow "none", "default" or "random" (to have python generate one), or simpleN
"""
@@ -70,7 +70,7 @@ def fromPSK(valstr):
return fromStr(valstr)
def fromStr(valstr):
def fromStr(valstr: str) -> Any:
"""Try to parse as int, float or bool (and fallback to a string as last resort)
Returns: an int, bool, float, str or byte array (for strings of hex digits)
@@ -78,6 +78,7 @@ def fromStr(valstr):
Args:
valstr (string): A user provided string
"""
val: Any
if len(valstr) == 0: # Treat an emptystring as an empty bytes
val = bytes()
elif valstr.startswith("0x"):
@@ -100,7 +101,15 @@ def fromStr(valstr):
return val
def pskToString(psk: bytes):
def toStr(raw_value):
"""Convert a value to a string that can be used in a config file"""
if isinstance(raw_value, bytes):
return "base64:" + base64.b64encode(raw_value).decode("utf-8")
return str(raw_value)
def pskToString(psk: bytes) -> str:
"""Given an array of PSK bytes, decode them into a human readable (but privacy protecting) string"""
if len(psk) == 0:
return "unencrypted"
@@ -122,12 +131,12 @@ def stripnl(s) -> str:
return " ".join(s.split())
def fixme(message):
def fixme(message: str) -> None:
"""Raise an exception for things that needs to be fixed"""
raise Exception(f"FIXME: {message}") # pylint: disable=W0719
def catchAndIgnore(reason, closure):
def catchAndIgnore(reason: str, closure) -> None:
"""Call a closure but if it throws an exception print it and continue"""
try:
closure()
@@ -145,7 +154,7 @@ def findPorts(eliminate_duplicates: bool=False) -> List[str]:
all_ports = serial.tools.list_ports.comports()
# look for 'likely' meshtastic devices
ports = list(
ports: List = list(
map(
lambda port: port.device,
filter(
@@ -184,12 +193,12 @@ class dotdict(dict):
class Timeout:
"""Timeout class"""
def __init__(self, maxSecs: int=20):
def __init__(self, maxSecs: int=20) -> None:
self.expireTime: Union[int, float] = 0
self.sleepInterval: float = 0.1
self.expireTimeout: int = maxSecs
def reset(self):
def reset(self) -> None:
"""Restart the waitForSet timer"""
self.expireTime = time.time() + self.expireTimeout
@@ -248,7 +257,7 @@ class Timeout:
class Acknowledgment:
"A class that records which type of acknowledgment was just received, if any."
def __init__(self):
def __init__(self) -> None:
"""initialize"""
self.receivedAck = False
self.receivedNak = False
@@ -257,7 +266,7 @@ class Acknowledgment:
self.receivedTelemetry = False
self.receivedPosition = False
def reset(self):
def reset(self) -> None:
"""reset"""
self.receivedAck = False
self.receivedNak = False
@@ -270,18 +279,18 @@ class Acknowledgment:
class DeferredExecution:
"""A thread that accepts closures to run, and runs them as they are received"""
def __init__(self, name):
self.queue = Queue()
def __init__(self, name) -> None:
self.queue: Queue = Queue()
# this thread must be marked as daemon, otherwise it will prevent clients from exiting
self.thread = threading.Thread(target=self._run, args=(), name=name, daemon=True)
self.thread.daemon = True
self.thread.start()
def queueWork(self, runnable):
def queueWork(self, runnable) -> None:
"""Queue up the work"""
self.queue.put(runnable)
def _run(self):
def _run(self) -> None:
while True:
try:
o = self.queue.get()
@@ -301,7 +310,7 @@ def our_exit(message, return_value=1) -> NoReturn:
sys.exit(return_value)
def support_info():
def support_info() -> None:
"""Print out info that helps troubleshooting of the cli."""
print("")
print("If having issues with meshtastic cli or python library")
@@ -330,7 +339,7 @@ def support_info():
print("Please add the output from the command: meshtastic --info")
def remove_keys_from_dict(keys, adict):
def remove_keys_from_dict(keys: Union[Tuple, List, Set], adict: Dict) -> Dict:
"""Return a dictionary without some keys in it.
Will removed nested keys.
"""
@@ -345,33 +354,33 @@ def remove_keys_from_dict(keys, adict):
return adict
def hexstr(barray):
def hexstr(barray: bytes) -> str:
"""Print a string of hex digits"""
return ":".join(f"{x:02x}" for x in barray)
def ipstr(barray):
def ipstr(barray: bytes) -> str:
"""Print a string of ip digits"""
return ".".join(f"{x}" for x in barray)
def readnet_u16(p, offset):
def readnet_u16(p, offset: int) -> int:
"""Read big endian u16 (network byte order)"""
return p[offset] * 256 + p[offset + 1]
def convert_mac_addr(val):
def convert_mac_addr(val: str) -> Union[str, bytes]:
"""Convert the base 64 encoded value to a mac address
val - base64 encoded value (ex: '/c0gFyhb'))
returns: a string formatted like a mac address (ex: 'fd:cd:20:17:28:5b')
"""
if not re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", val):
val_as_bytes = base64.b64decode(val)
val_as_bytes: bytes = base64.b64decode(val)
return hexstr(val_as_bytes)
return val
def snake_to_camel(a_string):
def snake_to_camel(a_string: str) -> str:
"""convert snake_case to camelCase"""
# split underscore using split
temp = a_string.split("_")
@@ -380,16 +389,16 @@ def snake_to_camel(a_string):
return result
def camel_to_snake(a_string):
def camel_to_snake(a_string: str) -> str:
"""convert camelCase to snake_case"""
return "".join(["_" + i.lower() if i.isupper() else i for i in a_string]).lstrip(
"_"
)
def detect_supported_devices():
def detect_supported_devices() -> Set:
"""detect supported devices based on vendor id"""
system = platform.system()
system: str = platform.system()
# print(f'system:{system}')
possible_devices = set()
@@ -447,9 +456,9 @@ def detect_supported_devices():
return possible_devices
def detect_windows_needs_driver(sd, print_reason=False):
def detect_windows_needs_driver(sd, print_reason=False) -> bool:
"""detect if Windows user needs to install driver for a supported device"""
need_to_install_driver = False
need_to_install_driver: bool = False
if sd:
system = platform.system()
@@ -475,7 +484,7 @@ def detect_windows_needs_driver(sd, print_reason=False):
return need_to_install_driver
def eliminate_duplicate_port(ports):
def eliminate_duplicate_port(ports: List) -> List:
"""Sometimes we detect 2 serial ports, but we really only need to use one of the ports.
ports is a list of ports
@@ -508,9 +517,9 @@ def eliminate_duplicate_port(ports):
return new_ports
def is_windows11():
def is_windows11() -> bool:
"""Detect if Windows 11"""
is_win11 = False
is_win11: bool = False
if platform.system() == "Windows":
if float(platform.release()) >= 10.0:
patch = platform.version().split(".")[2]
@@ -524,7 +533,7 @@ def is_windows11():
return is_win11
def get_unique_vendor_ids():
def get_unique_vendor_ids() -> Set[str]:
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
@@ -533,7 +542,7 @@ def get_unique_vendor_ids():
return vids
def get_devices_with_vendor_id(vid):
def get_devices_with_vendor_id(vid: str) -> Set: #Set[SupportedDevice]
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
@@ -542,11 +551,11 @@ def get_devices_with_vendor_id(vid):
return sd
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
def active_ports_on_supported_devices(sds, eliminate_duplicates=False) -> Set[str]:
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
ports: Set = set()
baseports: Set = set()
system: str = platform.system()
# figure out what possible base ports there are
for d in sds:
@@ -604,13 +613,13 @@ def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
for com_port in com_ports:
ports.add(com_port)
if eliminate_duplicates:
ports = eliminate_duplicate_port(list(ports))
ports.sort()
ports = set(ports)
portlist: List = eliminate_duplicate_port(list(ports))
portlist.sort()
ports = set(portlist)
return ports
def detect_windows_port(sd):
def detect_windows_port(sd) -> Set[str]: #"sd" is a SupportedDevice from meshtastic.supported_device
"""detect if Windows port"""
ports = set()
@@ -635,20 +644,26 @@ def detect_windows_port(sd):
return ports
def check_if_newer_version():
def check_if_newer_version() -> Optional[str]:
"""Check pip to see if we are running the latest version."""
pypi_version = None
pypi_version: Optional[str] = None
try:
url = "https://pypi.org/pypi/meshtastic/json"
url: str = "https://pypi.org/pypi/meshtastic/json"
data = requests.get(url, timeout=5).json()
pypi_version = data["info"]["version"]
except Exception:
pass
act_version = get_active_version()
if pypi_version is None:
return None
try:
parsed_act_version = pkg_version.parse(act_version)
parsed_pypi_version = pkg_version.parse(pypi_version)
#Note: if handed "None" when we can't download the pypi_version,
#this gets a TypeError:
#"TypeError: expected string or bytes-like object, got 'NoneType'"
#Handle that below?
except pkg_version.InvalidVersion:
return pypi_version
@@ -660,5 +675,8 @@ def check_if_newer_version():
def message_to_json(message: Message, multiline: bool=False) -> str:
"""Return protobuf message as JSON. Always print all fields, even when not present in data."""
json = MessageToJson(message, always_print_fields_with_no_presence=True)
try:
json = MessageToJson(message, always_print_fields_with_no_presence=True)
except TypeError:
json = MessageToJson(message, including_default_value_fields=True) # type: ignore[call-arg] # pylint: disable=E1123
return stripnl(json) if not multiline else json

1524
poetry.lock generated
View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "meshtastic"
version = "2.5.3"
version = "2.5.6"
description = "Python API & client shell for talking to Meshtastic devices"
authors = ["Meshtastic Developers <contact@meshtastic.org>"]
license = "GPL-3.0-only"
@@ -9,19 +9,16 @@ readme = "README.md"
[tool.poetry.dependencies]
python = "^3.9,<3.14" # 3.9 is needed for pandas, bleak requires <3.14
pyserial = "^3.5"
protobuf = ">=5.26.0"
dotmap = "^1.3.30"
pexpect = "^4.9.0"
pyqrcode = "^1.2.1"
protobuf = ">=4.21.12"
tabulate = "^0.9.0"
webencodings = "^0.5.1"
requests = "^2.31.0"
pyparsing = "^3.1.2"
pyyaml = "^6.0.1"
pypubsub = "^4.0.3"
bleak = "^0.22.3"
packaging = "^24.0"
print-color = "^0.4.6"
pyqrcode = { version = "^1.2.1", optional = true }
dotmap = { version = "^1.3.30", optional = true }
print-color = { version = "^0.4.6", optional = true }
dash = { version = "^2.17.1", optional = true }
pytap2 = { version = "^2.3.0", optional = true }
dash-bootstrap-components = { version = "^1.6.0", optional = true }
@@ -37,7 +34,7 @@ autopep8 = "^2.1.0"
pylint = "^3.2.3"
pyinstaller = "^6.8.0"
mypy = "^1.10.0"
mypy-protobuf = "^3.6.0"
mypy-protobuf = "^3.3.0"
types-protobuf = "^5.26.0.20240422"
types-tabulate = "^0.9.0.20240106"
types-requests = "^2.31.0.20240406"
@@ -67,6 +64,7 @@ ipywidgets = "^8.1.3"
jupyterlab-widgets = "^3.0.11"
[tool.poetry.extras]
cli = ["pyqrcode", "print-color", "dotmap"]
tunnel = ["pytap2"]
analysis = ["dash", "dash-bootstrap-components", "pandas", "pandas-stubs"]