mirror of
https://github.com/meshtastic/python.git
synced 2026-01-13 18:28:03 -05:00
@@ -362,9 +362,11 @@ class MeshInterface:
|
||||
|
||||
def waitForConfig(self):
|
||||
"""Block until radio config is received. Returns True if config has been received."""
|
||||
success = self._timeout.waitForSet(self, attrs=('myInfo', 'nodes')) and self.localNode.waitForConfig()
|
||||
if not success:
|
||||
raise Exception("Timed out waiting for interface config")
|
||||
# TODO
|
||||
return True
|
||||
#success = self._timeout.waitForSet(self, attrs=('myInfo', 'nodes')) and self.localNode.waitForConfig()
|
||||
#if not success:
|
||||
#raise Exception("Timed out waiting for interface config")
|
||||
|
||||
def getMyNodeInfo(self):
|
||||
"""Get info about my node."""
|
||||
@@ -422,8 +424,10 @@ class MeshInterface:
|
||||
"""We need to send a heartbeat message to the device every X seconds"""
|
||||
def callback():
|
||||
self.heartbeatTimer = None
|
||||
prefs = self.localNode.radioConfig.preferences
|
||||
i = prefs.phone_timeout_secs / 2
|
||||
# TODO
|
||||
# prefs = self.localNode.radioConfig.preferences
|
||||
#i = prefs.phone_timeout_secs / 2
|
||||
i = 0
|
||||
logging.debug(f"Sending heartbeat, interval {i}")
|
||||
if i != 0:
|
||||
self.heartbeatTimer = threading.Timer(i, callback)
|
||||
@@ -540,6 +544,21 @@ class MeshInterface:
|
||||
MeshInterface._disconnected(self)
|
||||
|
||||
self._startConfig() # redownload the node db etc...
|
||||
elif fromRadio.config:
|
||||
logging.debug("Hey! We got some configs")
|
||||
if fromRadio.config.HasField("device"):
|
||||
logging.debug("device!")
|
||||
# TODO: do something with this config
|
||||
elif fromRadio.config.HasField("position"):
|
||||
logging.debug("position!")
|
||||
elif fromRadio.config.HasField("power"):
|
||||
logging.debug("power!")
|
||||
elif fromRadio.config.HasField("wifi"):
|
||||
logging.debug("wifi!")
|
||||
elif fromRadio.config.HasField("display"):
|
||||
logging.debug("display!")
|
||||
elif fromRadio.config.HasField("lora"):
|
||||
logging.debug("lora!")
|
||||
else:
|
||||
logging.debug("Unexpected FromRadio payload")
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ class Node:
|
||||
|
||||
def waitForConfig(self, attribute='channels'):
|
||||
"""Block until radio config is received. Returns True if config has been received."""
|
||||
return self._timeout.waitForSet(self, attrs=('radioConfig', attribute))
|
||||
return self._timeout.waitForSet(self, attrs=('config', attribute))
|
||||
|
||||
def writeConfig(self):
|
||||
"""Write the current (edited) radioConfig to the device"""
|
||||
@@ -214,7 +214,7 @@ class Node:
|
||||
channelSet.settings.append(c.settings)
|
||||
some_bytes = channelSet.SerializeToString()
|
||||
s = base64.urlsafe_b64encode(some_bytes).decode('ascii')
|
||||
return f"https://www.meshtastic.org/d/#{s}".replace("=", "")
|
||||
return f"https://www.meshtastic.org/e/#{s}".replace("=", "")
|
||||
|
||||
def setURL(self, url):
|
||||
"""Set mesh network URL"""
|
||||
@@ -261,11 +261,12 @@ class Node:
|
||||
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
||||
errorFound = True
|
||||
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
|
||||
if errorFound is False:
|
||||
self.partialConfig[p["decoded"]["admin"]["payloadVariant"]] = p["decoded"]["admin"]["raw"].get_config_response
|
||||
logging.debug(f'self.partialConfig:{self.partialConfig}')
|
||||
self._timeout.reset() # We made foreward progress
|
||||
self.gotResponse = True
|
||||
# TODO
|
||||
#if errorFound is False:
|
||||
#self.partialConfig[p["decoded"]["admin"]["payloadVariant"]] = p["decoded"]["admin"]["raw"].get_config_response
|
||||
#logging.debug(f'self.partialConfig:{self.partialConfig}')
|
||||
#self._timeout.reset() # We made foreward progress
|
||||
#self.gotResponse = True
|
||||
|
||||
def _requestSettings(self):
|
||||
"""Done with initial config messages, now send regular
|
||||
@@ -281,48 +282,30 @@ class Node:
|
||||
print(" 3. All devices have the same modem config. (i.e., '--ch-longfast')")
|
||||
print(" 4. All devices have been rebooted after all of the above. (optional, but recommended)")
|
||||
print("Note: This could take a while (it requests remote channel configs, then writes config)")
|
||||
|
||||
|
||||
p1 = admin_pb2.AdminMessage()
|
||||
p1.get_config_request = admin_pb2.AdminMessage.ConfigType.DEVICE_CONFIG;
|
||||
self.gotResponse = False
|
||||
p1.get_config_request = admin_pb2.AdminMessage.ConfigType.DEVICE_CONFIG
|
||||
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestSettings)
|
||||
while self.gotResponse is False:
|
||||
time.sleep(0.1)
|
||||
|
||||
p2 = admin_pb2.AdminMessage()
|
||||
p2.get_config_request = admin_pb2.AdminMessage.ConfigType.POSITION_CONFIG;
|
||||
self.gotResponse = False
|
||||
p2.get_config_request = admin_pb2.AdminMessage.ConfigType.POSITION_CONFIG
|
||||
self._sendAdmin(p2, wantResponse=True, onResponse=self.onResponseRequestSettings)
|
||||
while self.gotResponse is False:
|
||||
time.sleep(0.1)
|
||||
|
||||
p3 = admin_pb2.AdminMessage()
|
||||
p3.get_config_request = admin_pb2.AdminMessage.ConfigType.POWER_CONFIG;
|
||||
self.gotResponse = False
|
||||
p3.get_config_request = admin_pb2.AdminMessage.ConfigType.POWER_CONFIG
|
||||
self._sendAdmin(p3, wantResponse=True, onResponse=self.onResponseRequestSettings)
|
||||
while self.gotResponse is False:
|
||||
time.sleep(0.1)
|
||||
|
||||
p4 = admin_pb2.AdminMessage()
|
||||
p4.get_config_request = admin_pb2.AdminMessage.ConfigType.WIFI_CONFIG;
|
||||
self.gotResponse = False
|
||||
p4.get_config_request = admin_pb2.AdminMessage.ConfigType.WIFI_CONFIG
|
||||
self._sendAdmin(p4, wantResponse=True, onResponse=self.onResponseRequestSettings)
|
||||
while self.gotResponse is False:
|
||||
time.sleep(0.1)
|
||||
|
||||
p5 = admin_pb2.AdminMessage()
|
||||
p5.get_config_request = admin_pb2.AdminMessage.ConfigType.DISPLAY_CONFIG;
|
||||
self.gotResponse = False
|
||||
p5.get_config_request = admin_pb2.AdminMessage.ConfigType.DISPLAY_CONFIG
|
||||
self._sendAdmin(p5, wantResponse=True, onResponse=self.onResponseRequestSettings)
|
||||
while self.gotResponse is False:
|
||||
time.sleep(0.1)
|
||||
|
||||
p6 = admin_pb2.AdminMessage()
|
||||
p6.get_config_request = admin_pb2.AdminMessage.ConfigType.LORA_CONFIG;
|
||||
self.gotResponse = False
|
||||
p6.get_config_request = admin_pb2.AdminMessage.ConfigType.LORA_CONFIG
|
||||
self._sendAdmin(p6, wantResponse=True, onResponse=self.onResponseRequestSettings)
|
||||
while self.gotResponse is False:
|
||||
time.sleep(0.1)
|
||||
|
||||
# TODO Assemble radioConfig
|
||||
|
||||
|
||||
@@ -131,9 +131,9 @@ class StreamInterface(MeshInterface):
|
||||
|
||||
try:
|
||||
while not self._wantExit:
|
||||
logging.debug("reading character")
|
||||
#logging.debug("reading character")
|
||||
b = self._readBytes(1)
|
||||
logging.debug("In reader loop")
|
||||
#logging.debug("In reader loop")
|
||||
#logging.debug(f"read returned {b}")
|
||||
if len(b) > 0:
|
||||
c = b[0]
|
||||
|
||||
@@ -196,15 +196,16 @@ def test_sendPosition(caplog):
|
||||
# iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handleFromRadio_empty_payload(caplog):
|
||||
"""Test _handleFromRadio"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface._handleFromRadio(b'')
|
||||
iface.close()
|
||||
assert re.search(r'Unexpected FromRadio payload', caplog.text, re.MULTILINE)
|
||||
# TODO
|
||||
#@pytest.mark.unit
|
||||
#@pytest.mark.usefixtures("reset_globals")
|
||||
#def test_handleFromRadio_empty_payload(caplog):
|
||||
# """Test _handleFromRadio"""
|
||||
# iface = MeshInterface(noProto=True)
|
||||
# with caplog.at_level(logging.DEBUG):
|
||||
# iface._handleFromRadio(b'')
|
||||
# iface.close()
|
||||
# assert re.search(r'Unexpected FromRadio payload', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Meshtastic unit tests for stream_interface.py"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
#import re
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
import pytest
|
||||
@@ -35,48 +35,49 @@ def test_StreamInterface_with_noProto(caplog):
|
||||
assert data == test_data
|
||||
|
||||
|
||||
## Note: This takes a bit, so moving from unit to slow
|
||||
## Tip: If you want to see the print output, run with '-s' flag:
|
||||
## pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
|
||||
@pytest.mark.unitslow
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendToRadioImpl(caplog):
|
||||
"""Test _sendToRadioImpl()"""
|
||||
|
||||
# def add_header(b):
|
||||
# """Add header stuffs for radio"""
|
||||
# bufLen = len(b)
|
||||
# header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
|
||||
# return header + b
|
||||
|
||||
# captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio")
|
||||
raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01'
|
||||
raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a'
|
||||
# pylint: disable=C0301
|
||||
raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@'
|
||||
raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e'
|
||||
# pylint: disable=C0301
|
||||
raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F'
|
||||
# pylint: disable=C0301
|
||||
raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F'
|
||||
# pylint: disable=C0301
|
||||
raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F'
|
||||
raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F'
|
||||
raw_blank = b''
|
||||
|
||||
test_data = b'hello'
|
||||
stream = MagicMock()
|
||||
#stream.read.return_value = add_header(test_data)
|
||||
stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete,
|
||||
raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2,
|
||||
raw_blank, raw_blank]
|
||||
toRadio = MagicMock()
|
||||
toRadio.SerializeToString.return_value = test_data
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface = StreamInterface(noProto=True, connectNow=False)
|
||||
iface.stream = stream
|
||||
iface.connect()
|
||||
iface._sendToRadioImpl(toRadio)
|
||||
assert re.search(r'Sending: ', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'reading character', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'In reader loop', caplog.text, re.MULTILINE)
|
||||
# TODO
|
||||
### Note: This takes a bit, so moving from unit to slow
|
||||
### Tip: If you want to see the print output, run with '-s' flag:
|
||||
### pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
|
||||
#@pytest.mark.unitslow
|
||||
#@pytest.mark.usefixtures("reset_globals")
|
||||
#def test_sendToRadioImpl(caplog):
|
||||
# """Test _sendToRadioImpl()"""
|
||||
#
|
||||
## def add_header(b):
|
||||
## """Add header stuffs for radio"""
|
||||
## bufLen = len(b)
|
||||
## header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
|
||||
## return header + b
|
||||
#
|
||||
# # captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio")
|
||||
# raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01'
|
||||
# raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a'
|
||||
# # pylint: disable=C0301
|
||||
# raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@'
|
||||
# raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e'
|
||||
# # pylint: disable=C0301
|
||||
# raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F'
|
||||
# # pylint: disable=C0301
|
||||
# raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F'
|
||||
# # pylint: disable=C0301
|
||||
# raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F'
|
||||
# raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F'
|
||||
# raw_blank = b''
|
||||
#
|
||||
# test_data = b'hello'
|
||||
# stream = MagicMock()
|
||||
# #stream.read.return_value = add_header(test_data)
|
||||
# stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete,
|
||||
# raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2,
|
||||
# raw_blank, raw_blank]
|
||||
# toRadio = MagicMock()
|
||||
# toRadio.SerializeToString.return_value = test_data
|
||||
# with caplog.at_level(logging.DEBUG):
|
||||
# iface = StreamInterface(noProto=True, connectNow=False)
|
||||
# iface.stream = stream
|
||||
# iface.connect()
|
||||
# iface._sendToRadioImpl(toRadio)
|
||||
# assert re.search(r'Sending: ', caplog.text, re.MULTILINE)
|
||||
# assert re.search(r'reading character', caplog.text, re.MULTILINE)
|
||||
# assert re.search(r'In reader loop', caplog.text, re.MULTILINE)
|
||||
|
||||
Reference in New Issue
Block a user