Merge pull request #175 from mkinney/continue_working_on_unit_tests

add unit tests for toRadio, sendData() and sendPosition(); found and …
This commit is contained in:
mkinney
2021-12-22 16:29:45 -08:00
committed by GitHub
7 changed files with 1051 additions and 50 deletions

View File

@@ -10,6 +10,7 @@ install:
lint:
pylint meshtastic
# run the coverage report and open results in a browser
cov:
pytest --cov-report html --cov=meshtastic
# on mac, this will open the coverage report in a browser
@@ -19,4 +20,5 @@ cov:
examples: FORCE
pytest -mexamples
# Makefile hack to get the examples to always run
FORCE: ;

View File

@@ -28,7 +28,7 @@ An example using Python 3 code to send a message to the mesh:
```
import meshtastic
interface = meshtastic.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface = meshtastic.serial_interface.SerialInterface() # By default will try to find a meshtastic device, otherwise provide a device path like /dev/ttyUSB0
interface.sendText("hello mesh") # or sendData to send binary data, see documentations for other options.
interface.close()
```
@@ -103,7 +103,7 @@ You can even set the channel preshared key to a particular AES128 or AES256 sequ
meshtastic --ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b --info
```
Use "--ch-set psk none" to turn off encryption.
Use "--ch-set psk none" to turn off encryption.
Use "--ch-set psk random" will assign a new (high quality) random AES256 key to the primary channel (similar to what the Android app does when making new channels).

View File

@@ -103,6 +103,7 @@ class MeshInterface:
rows = []
if self.nodes:
logging.debug(f'self.nodes:{self.nodes}')
for node in self.nodes.values():
if not includeSelf and node['num'] == self.localNode.nodeNum:
continue
@@ -227,8 +228,9 @@ class MeshInterface:
data = data.SerializeToString()
logging.debug(f"len(data): {len(data)}")
logging.debug(f"mesh_pb2.Constants.DATA_PAYLOAD_LEN: {mesh_pb2.Constants.DATA_PAYLOAD_LEN}")
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
Exception("Data payload too big")
raise Exception("Data payload too big")
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
our_exit("Warning: A non-zero port number must be specified")
@@ -261,12 +263,15 @@ class MeshInterface:
p = mesh_pb2.Position()
if latitude != 0.0:
p.latitude_i = int(latitude / 1e-7)
logging.debug(f'p.latitude_i:{p.latitude_i}')
if longitude != 0.0:
p.longitude_i = int(longitude / 1e-7)
logging.debug(f'p.longitude_i:{p.longitude_i}')
if altitude != 0:
p.altitude = int(altitude)
logging.debug(f'p.altitude:{p.altitude}')
if timeSec == 0:
timeSec = time.time() # returns unix timestamp in seconds
@@ -307,7 +312,10 @@ class MeshInterface:
elif destinationId == BROADCAST_ADDR:
nodeNum = BROADCAST_NUM
elif destinationId == LOCAL_ADDR:
nodeNum = self.myInfo.my_node_num
if self.myInfo:
nodeNum = self.myInfo.my_node_num
else:
our_exit("Warning: No myInfo found.")
# A simple hex style nodeid - we can parse this without needing the DB
elif destinationId.startswith("!"):
nodeNum = int(destinationId[1:], 16)
@@ -330,7 +338,7 @@ class MeshInterface:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
#logging.debug(f"Sending packet: {stripnl(meshPacket)}")
logging.debug(f"Sending packet: {stripnl(meshPacket)}")
self._sendToRadio(toRadio)
return meshPacket
@@ -344,6 +352,7 @@ class MeshInterface:
"""Get info about my node."""
if self.myInfo is None:
return None
logging.debug(f'self.nodesByNum:{self.nodesByNum}')
return self.nodesByNum.get(self.myInfo.my_node_num)
def getMyUser(self):
@@ -370,8 +379,9 @@ class MeshInterface:
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.isConnected.wait(15.0): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")
if not self.noProto:
if not self.isConnected.wait(15.0): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client
if self.failure:

View File

@@ -8,6 +8,8 @@ from . import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from .util import pskToString, stripnl, Timeout, our_exit, fromPSK
class Node:
"""A model of a (local or remote) node in the mesh
@@ -28,7 +30,9 @@ class Node:
"""Show human readable description of our channels."""
print("Channels:")
if self.channels:
logging.debug(f'self.channels:{self.channels}')
for c in self.channels:
#print('c.settings.psk:', c.settings.psk)
cStr = stripnl(MessageToJson(c.settings))
# only show if there is no psk (meaning disabled channel)
if c.settings.psk:
@@ -108,7 +112,7 @@ class Node:
# *moving* the admin channel index as we are writing
if (self.iface.localNode == self) and index >= adminIndex:
# We've now passed the old location for admin index
# (and writen it), so we can start finding it by name again
# (and written it), so we can start finding it by name again
adminIndex = 0
def getChannelByName(self, name):
@@ -220,25 +224,29 @@ class Node:
self.writeChannel(ch.index)
i = i + 1
def onResponseRequestSettings(self, p):
"""Handle the response packet for requesting settings _requestSettings()"""
logging.debug(f'onResponseRequestSetting() p:{p}')
errorFound = False
if 'routing' in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response
logging.debug(f'self.radioConfig:{self.radioConfig}')
logging.debug("Received radio config, now fetching channels...")
self._timeout.reset() # We made foreward progress
self._requestChannel(0) # now start fetching channels
def _requestSettings(self):
"""Done with initial config messages, now send regular
MeshPackets to ask for settings."""
p = admin_pb2.AdminMessage()
p.get_radio_request = True
def onResponse(p):
"""A closure to handle the response packet"""
errorFound = False
if 'routing' in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
self.radioConfig = p["decoded"]["admin"]["raw"].get_radio_response
logging.debug("Received radio config, now fetching channels...")
self._timeout.reset() # We made foreward progress
self._requestChannel(0) # now start fetching channels
# Show progress message for super slow operations
if self != self.iface.localNode:
print("Requesting preferences from remote node.")
@@ -249,7 +257,7 @@ class Node:
print(" 4. All devices have been rebooted after all of the above. (optional, but recommended)")
print("Note: This could take a while (it requests remote channel configs, then writes config)")
return self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)
def exitSimulator(self):
"""Tell a simulator node to exit (this message
@@ -290,6 +298,34 @@ class Node:
self.channels.append(ch)
index += 1
def onResponseRequestChannel(self, p):
"""Handle the response packet for requesting a channel _requestChannel()"""
logging.debug(f'onResponseRequestChannel() p:{p}')
c = p["decoded"]["admin"]["raw"].get_channel_response
self.partialChannels.append(c)
self._timeout.reset() # We made foreward progress
logging.debug(f"Received channel {stripnl(c)}")
index = c.index
# for stress testing, we can always download all channels
fastChannelDownload = True
# Once we see a response that has NO settings, assume
# we are at the end of channels and stop fetching
quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload
if quitEarly or index >= self.iface.myInfo.max_channels - 1:
logging.debug("Finished downloading channels")
self.channels = self.partialChannels
self._fixupChannels()
# FIXME, the following should only be called after we have settings and channels
self.iface._connected() # Tell everyone else we are ready to go
else:
self._requestChannel(index + 1)
def _requestChannel(self, channelNum: int):
"""Done with initial config messages, now send regular
MeshPackets to ask for settings"""
@@ -303,33 +339,7 @@ class Node:
else:
logging.debug(f"Requesting channel {channelNum}")
def onResponse(p):
"""A closure to handle the response packet for requesting a channel"""
c = p["decoded"]["admin"]["raw"].get_channel_response
self.partialChannels.append(c)
self._timeout.reset() # We made foreward progress
logging.debug(f"Received channel {stripnl(c)}")
index = c.index
# for stress testing, we can always download all channels
fastChannelDownload = True
# Once we see a response that has NO settings, assume
# we are at the end of channels and stop fetching
quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload
if quitEarly or index >= self.iface.myInfo.max_channels - 1:
logging.debug("Finished downloading channels")
self.channels = self.partialChannels
self._fixupChannels()
# FIXME, the following should only be called after we have settings and channels
self.iface._connected() # Tell everyone else we are ready to go
else:
self._requestChannel(index + 1)
return self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestChannel)
# pylint: disable=R1710

View File

@@ -2,9 +2,11 @@
import argparse
from unittest.mock import MagicMock
import pytest
from meshtastic.__main__ import Globals
from ..mesh_interface import MeshInterface
@pytest.fixture
def reset_globals():
@@ -13,3 +15,46 @@ def reset_globals():
parser = argparse.ArgumentParser()
Globals.getInstance().reset()
Globals.getInstance().set_parser(parser)
@pytest.fixture
def iface_with_nodes():
"""Fixture to setup some nodes."""
nodesById = {
'!9388f81c': {
'num': 2475227164,
'user': {
'id': '!9388f81c',
'longName': 'Unknown f81c',
'shortName': '?1C',
'macaddr': 'RBeTiPgc',
'hwModel': 'TBEAM'
},
'position': {},
'lastHeard': 1640204888
}
}
nodesByNum = {
2475227164: {
'num': 2475227164,
'user': {
'id': '!9388f81c',
'longName': 'Unknown f81c',
'shortName': '?1C',
'macaddr': 'RBeTiPgc',
'hwModel': 'TBEAM'
},
'position': {
'time': 1640206266
},
'lastHeard': 1640206266
}
}
iface = MeshInterface(noProto=True)
iface.nodes = nodesById
iface.nodesByNum = nodesByNum
myInfo = MagicMock()
iface.myInfo = myInfo
iface.myInfo.my_node_num = 2475227164
return iface

View File

@@ -9,13 +9,36 @@ import pytest
from ..mesh_interface import MeshInterface
from ..node import Node
from .. import mesh_pb2
from ..__init__ import LOCAL_ADDR
from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR
@pytest.mark.unit
def test_MeshInterface(capsys, reset_globals):
"""Test that we can instantiate a MeshInterface"""
iface = MeshInterface(noProto=True)
anode = Node('foo', 'bar')
nodes = {
'!9388f81c': {
'num': 2475227164,
'user': {
'id': '!9388f81c',
'longName': 'Unknown f81c',
'shortName': '?1C',
'macaddr': 'RBeTiPgc',
'hwModel': 'TBEAM'
},
'position': {},
'lastHeard': 1640204888
}
}
iface.nodesByNum = {1: anode }
iface.nodes = nodes
myInfo = MagicMock()
iface.myInfo = myInfo
iface.showInfo()
iface.localNode.showInfo()
iface.showNodes()
@@ -30,6 +53,36 @@ def test_MeshInterface(capsys, reset_globals):
assert err == ''
@pytest.mark.unit
def test_getMyUser(reset_globals, iface_with_nodes):
"""Test getMyUser()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
myuser = iface.getMyUser()
print(f'myuser:{myuser}')
assert myuser is not None
assert myuser["id"] == '!9388f81c'
@pytest.mark.unit
def test_getLongName(reset_globals, iface_with_nodes):
"""Test getLongName()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
mylongname = iface.getLongName()
assert mylongname == 'Unknown f81c'
@pytest.mark.unit
def test_getShortName(reset_globals, iface_with_nodes):
"""Test getShortName()."""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
myshortname = iface.getShortName()
assert myshortname == '?1C'
@pytest.mark.unit
def test_handlePacketFromRadio_no_from(capsys, reset_globals):
"""Test _handlePacketFromRadio with no 'from' in the mesh packet."""
@@ -216,3 +269,194 @@ def test_handleFromRadio_with_node_info_tbeam_with_bad_data(reset_globals, caplo
with caplog.at_level(logging.DEBUG):
iface._startConfig()
iface._handleFromRadio(from_radio_bytes)
@pytest.mark.unit
def test_MeshInterface_sendToRadioImpl(caplog, reset_globals):
"""Test _sendToRadioImp()"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface._sendToRadioImpl('foo')
assert re.search(r'Subclass must provide toradio', caplog.text, re.MULTILINE)
iface.close()
@pytest.mark.unit
def test_MeshInterface_sendToRadio_no_proto(caplog, reset_globals):
"""Test sendToRadio()"""
iface = MeshInterface()
with caplog.at_level(logging.DEBUG):
iface._sendToRadioImpl('foo')
assert re.search(r'Subclass must provide toradio', caplog.text, re.MULTILINE)
iface.close()
@pytest.mark.unit
def test_sendData_too_long(caplog, reset_globals):
"""Test when data payload is too big"""
iface = MeshInterface(noProto=True)
some_large_text = b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
some_large_text += b'This is a long text that will be too long for send text.'
with caplog.at_level(logging.DEBUG):
with pytest.raises(Exception) as pytest_wrapped_e:
iface.sendData(some_large_text)
assert re.search('Data payload too big', caplog.text, re.MULTILINE)
assert pytest_wrapped_e.type == Exception
iface.close()
@pytest.mark.unit
def test_sendData_unknown_app(capsys, reset_globals):
"""Test sendData when unknown app"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface.sendData(b'hello', portNum=0)
out, err = capsys.readouterr()
assert re.search(r'Warning: A non-zero port number', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_sendPosition_with_a_position(caplog, reset_globals):
"""Test sendPosition when lat/long/alt"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface.sendPosition(latitude=40.8, longitude=-111.86, altitude=201)
assert re.search(r'p.latitude_i:408', caplog.text, re.MULTILINE)
assert re.search(r'p.longitude_i:-11186', caplog.text, re.MULTILINE)
assert re.search(r'p.altitude:201', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_no_destination(capsys, reset_globals):
"""Test _sendPacket()"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface._sendPacket(b'', destinationId=None)
out, err = capsys.readouterr()
assert re.search(r'Warning: destinationId must not be None', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_sendPacket_with_destination_as_int(caplog, reset_globals):
"""Test _sendPacket() with int as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=123)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals):
"""Test _sendPacket() with int as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId='!1234')
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals):
"""Test _sendPacket() with BROADCAST_ADDR as a destination"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globals):
"""Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo"""
iface = MeshInterface(noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
out, err = capsys.readouterr()
assert re.search(r'Warning: No myInfo', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_globals):
"""Test _sendPacket() with LOCAL_ADDR as a destination with myInfo"""
iface = MeshInterface(noProto=True)
myInfo = MagicMock()
iface.myInfo = myInfo
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, reset_globals, iface_with_nodes):
"""Test _sendPacket() with '' as a destination with myInfo"""
iface = iface_with_nodes
meshPacket = mesh_pb2.MeshPacket()
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface._sendPacket(meshPacket, destinationId='')
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.match(r'Warning: NodeId not found in DB', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, reset_globals, iface_with_nodes):
"""Test _sendPacket() with '' as a destination with myInfo"""
iface = iface_with_nodes
iface.nodes = None
meshPacket = mesh_pb2.MeshPacket()
with caplog.at_level(logging.WARNING):
iface._sendPacket(meshPacket, destinationId='')
assert re.search(r'Warning: There were no self.nodes.', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_getMyNodeInfo(reset_globals):
"""Test getMyNodeInfo()"""
iface = MeshInterface(noProto=True)
anode = iface.getNode(LOCAL_ADDR)
iface.nodesByNum = {1: anode }
assert iface.nodesByNum.get(1) == anode
myInfo = MagicMock()
iface.myInfo = myInfo
iface.myInfo.my_node_num = 1
myinfo = iface.getMyNodeInfo()
assert myinfo == anode
@pytest.mark.unit
def test_generatePacketId(capsys, reset_globals):
"""Test _generatePacketId() when no currentPacketId (not connected)"""
iface = MeshInterface(noProto=True)
# not sure when this condition would ever happen... but we can simulate it
iface.currentPacketId = None
assert iface.currentPacketId is None
with pytest.raises(Exception) as pytest_wrapped_e:
iface._generatePacketId()
out, err = capsys.readouterr()
assert re.search(r'Not connected yet, can not generate packet', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == Exception

View File

@@ -9,12 +9,16 @@ import pytest
from ..node import Node
from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig
@pytest.mark.unit
def test_node(capsys):
"""Test that we can instantiate a Node"""
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
anode.showChannels()
anode.showInfo()
out, err = capsys.readouterr()
@@ -139,3 +143,689 @@ def test_setURL_valid_URL_but_no_settings(caplog):
anode.setURL(url)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
def test_showChannels(capsys):
"""Test showChannels"""
anode = Node('foo', 'bar')
# primary channel
# role: 0=Disabled, 1=Primary, 2=Secondary
# modem_config: 0-5
# role: 0=Disabled, 1=Primary, 2=Secondary
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'testing'
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
anode.showChannels()
out, err = capsys.readouterr()
assert re.search(r'Channels:', out, re.MULTILINE)
# primary channel
assert re.search(r'Primary channel URL', out, re.MULTILINE)
assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
assert re.search(r'"modemConfig": "Bw125Cr48Sf4096"', out, re.MULTILINE)
assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
# secondary channel
assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_deleteChannel_try_to_delete_primary_channel(capsys):
"""Try to delete primary channel."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
# no secondary channels
channel2 = Channel(index=2, role=0)
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
with pytest.raises(SystemExit) as pytest_wrapped_e:
anode.deleteChannel(0)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_deleteChannel_secondary():
"""Try to delete a secondary channel."""
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'testing'
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
anode.channels = channels
assert len(anode.channels) == 8
assert channels[0].settings.modem_config == 3
assert channels[1].settings.name == 'testing'
assert channels[2].settings.name == ''
assert channels[3].settings.name == ''
assert channels[4].settings.name == ''
assert channels[5].settings.name == ''
assert channels[6].settings.name == ''
assert channels[7].settings.name == ''
anode.deleteChannel(1)
assert len(anode.channels) == 8
assert channels[0].settings.modem_config == 3
assert channels[1].settings.name == ''
assert channels[2].settings.name == ''
assert channels[3].settings.name == ''
assert channels[4].settings.name == ''
assert channels[5].settings.name == ''
assert channels[6].settings.name == ''
assert channels[7].settings.name == ''
@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_after_testing():
"""Try to delete a secondary channel where there is an admin channel."""
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'testing'
channel3 = Channel(index=3, role=2)
channel3.settings.name = 'admin'
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
assert mo.localNode == anode
anode.channels = channels
assert len(anode.channels) == 8
assert channels[0].settings.modem_config == 3
assert channels[1].settings.name == 'testing'
assert channels[2].settings.name == 'admin'
assert channels[3].settings.name == ''
assert channels[4].settings.name == ''
assert channels[5].settings.name == ''
assert channels[6].settings.name == ''
assert channels[7].settings.name == ''
anode.deleteChannel(1)
assert len(anode.channels) == 8
assert channels[0].settings.modem_config == 3
assert channels[1].settings.name == 'admin'
assert channels[2].settings.name == ''
assert channels[3].settings.name == ''
assert channels[4].settings.name == ''
assert channels[5].settings.name == ''
assert channels[6].settings.name == ''
assert channels[7].settings.name == ''
@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_before_testing():
"""Try to delete a secondary channel where there is an admin channel."""
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'admin'
channel3 = Channel(index=3, role=2)
channel3.settings.name = 'testing'
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
anode.channels = channels
assert len(anode.channels) == 8
assert channels[0].settings.modem_config == 3
assert channels[1].settings.name == 'admin'
assert channels[2].settings.name == 'testing'
assert channels[3].settings.name == ''
assert channels[4].settings.name == ''
assert channels[5].settings.name == ''
assert channels[6].settings.name == ''
assert channels[7].settings.name == ''
anode.deleteChannel(2)
assert len(anode.channels) == 8
assert channels[0].settings.modem_config == 3
assert channels[1].settings.name == 'admin'
assert channels[2].settings.name == ''
assert channels[3].settings.name == ''
assert channels[4].settings.name == ''
assert channels[5].settings.name == ''
assert channels[6].settings.name == ''
assert channels[7].settings.name == ''
@pytest.mark.unit
def test_getChannelByName(capsys):
"""Get a channel by the name."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'admin'
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
ch = anode.getChannelByName('admin')
assert ch.index == 2
@pytest.mark.unit
def test_getChannelByName_invalid_name(capsys):
"""Get a channel by the name but one that is not present."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'admin'
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
ch = anode.getChannelByName('testing')
assert ch is None
@pytest.mark.unit
def test_getDisabledChannel(capsys):
"""Get the first disabled channel."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'testingA'
channel3 = Channel(index=3, role=2)
channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel3.settings.name = 'testingB'
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
ch = anode.getDisabledChannel()
assert ch.index == 4
@pytest.mark.unit
def test_getDisabledChannel_where_all_channels_are_used(capsys):
"""Get the first disabled channel."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel3 = Channel(index=3, role=2)
channel4 = Channel(index=4, role=2)
channel5 = Channel(index=5, role=2)
channel6 = Channel(index=6, role=2)
channel7 = Channel(index=7, role=2)
channel8 = Channel(index=8, role=2)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
ch = anode.getDisabledChannel()
assert ch is None
@pytest.mark.unit
def test_getAdminChannelIndex(capsys):
"""Get the 'admin' channel index."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=2)
channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel2.settings.name = 'admin'
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
i = anode._getAdminChannelIndex()
assert i == 2
@pytest.mark.unit
def test_getAdminChannelIndex_when_no_admin_named_channel(capsys):
"""Get the 'admin' channel when there is not one."""
anode = Node('foo', 'bar')
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
channel2 = Channel(index=2, role=0)
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
i = anode._getAdminChannelIndex()
assert i == 0
# TODO: should we check if we need to turn it off?
@pytest.mark.unit
def test_turnOffEncryptionOnPrimaryChannel(capsys):
"""Turn off encryption when there is a psk."""
anode = Node('foo', 'bar', noProto=True)
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
# value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'
channel2 = Channel(index=2, role=0)
channel3 = Channel(index=3, role=0)
channel4 = Channel(index=4, role=0)
channel5 = Channel(index=5, role=0)
channel6 = Channel(index=6, role=0)
channel7 = Channel(index=7, role=0)
channel8 = Channel(index=8, role=0)
channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]
anode.channels = channels
anode.turnOffEncryptionOnPrimaryChannel()
out, err = capsys.readouterr()
assert re.search(r'Writing modified channels to device', out)
assert err == ''
@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
"""Test writeConfig with no radioConfig."""
anode = Node('foo', 'bar', noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
anode.writeConfig()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Error: No RadioConfig has been read', out)
assert err == ''
@pytest.mark.unit
def test_writeConfig(caplog):
"""Test writeConfig"""
anode = Node('foo', 'bar', noProto=True)
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
with caplog.at_level(logging.DEBUG):
anode.writeConfig()
assert re.search(r'Wrote config', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_requestChannel_not_localNode(caplog):
"""Test _requestChannel()"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode._requestChannel(0)
assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_requestChannel_localNode(caplog):
"""Test _requestChannel()"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode._requestChannel(0)
assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE)
assert not re.search(r'from remote node', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
"""Test onResponseRequestChannel()"""
channel1 = Channel(index=1, role=1)
channel1.settings.modem_config = 3
channel1.settings.psk = b'\x01'
msg1 = MagicMock(autospec=AdminMessage)
msg1.get_channel_response = channel1
msg2 = MagicMock(autospec=AdminMessage)
channel2 = Channel(index=2, role=0) # disabled
msg2.get_channel_response = channel2
# default primary channel
packet1 = {
'from': 2475227164,
'to': 2475227164,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
'requestId': 2615094405,
'admin': {
'getChannelResponse': {
'settings': {
'modemConfig': 'Bw125Cr48Sf4096',
'psk': 'AQ=='
},
'role': 'PRIMARY'
},
'raw': msg1,
}
},
'id': 1692918436,
'hopLimit': 3,
'priority':
'RELIABLE',
'raw': 'fake',
'fromId': '!9388f81c',
'toId': '!9388f81c'
}
# no other channels
packet2 = {
'from': 2475227164,
'to': 2475227164,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': b':\x04\x08\x02\x12\x00',
'requestId': 743049663,
'admin': {
'getChannelResponse': {
'index': 2,
'settings': {}
},
'raw': msg2,
}
},
'id': 1692918456,
'rxTime': 1640202239,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!9388f81c',
'toId': '!9388f81c'
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.requestConfig()
anode.onResponseRequestChannel(packet1)
assert re.search(r'Received channel', caplog.text, re.MULTILINE)
anode.onResponseRequestChannel(packet2)
assert re.search(r'Received channel', caplog.text, re.MULTILINE)
assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
assert len(anode.channels) == 8
assert anode.channels[0].settings.modem_config == 3
assert anode.channels[1].settings.name == ''
assert anode.channels[2].settings.name == ''
assert anode.channels[3].settings.name == ''
assert anode.channels[4].settings.name == ''
assert anode.channels[5].settings.name == ''
assert anode.channels[6].settings.name == ''
assert anode.channels[7].settings.name == ''
@pytest.mark.unit
def test_onResponseRequestSetting(caplog):
"""Test onResponseRequestSetting()"""
# Note: Split out the get_radio_response to a MagicMock
# so it could be "returned" (not really sure how to do that
# in a python dict.
amsg = MagicMock(autospec=AdminMessage)
amsg.get_radio_response = """{
preferences {
phone_timeout_secs: 900
ls_secs: 300
position_broadcast_smart: true
position_flags: 35
}
}"""
packet = {
'from': 2475227164,
'to': 2475227164,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
'requestId': 3145147848,
'admin': {
'getRadioResponse': {
'preferences': {
'phoneTimeoutSecs': 900,
'lsSecs': 300,
'positionBroadcastSmart': True,
'positionFlags': 35
}
},
'raw': amsg
},
'id': 365963704,
'rxTime': 1640195197,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!9388f81c',
'toId': '!9388f81c'
}
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestSettings(packet)
assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_onResponseRequestSetting_with_error(capsys):
"""Test onResponseRequestSetting() with an error"""
packet = {
'from': 2475227164,
'to': 2475227164,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
'requestId': 3145147848,
'routing': {
'errorReason': 'some made up error',
},
'admin': {
'getRadioResponse': {
'preferences': {
'phoneTimeoutSecs': 900,
'lsSecs': 300,
'positionBroadcastSmart': True,
'positionFlags': 35
}
},
},
'id': 365963704,
'rxTime': 1640195197,
'hopLimit': 3,
'priority': 'RELIABLE',
'fromId': '!9388f81c',
'toId': '!9388f81c'
}
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
mo.myInfo.max_channels = 8
anode = Node(mo, 'bar', noProto=True)
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
anode.onResponseRequestSettings(packet)
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''