Merge pull request #171 from mkinney/more_testing

add unit tests for RemoteHardwareClient()
This commit is contained in:
mkinney
2021-12-20 23:27:45 -08:00
committed by GitHub
7 changed files with 167 additions and 26 deletions

View File

@@ -240,13 +240,15 @@ def onConnected(interface):
if args.sendtext:
closeNow = True
channelIndex = 0
if args.ch_index is not None:
channelIndex = int(args.ch_index)
print(f"Sending text message {args.sendtext} to {args.destOrAll}")
interface.sendText(args.sendtext, args.destOrAll,
wantAck=True)
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
if args.sendping:
print(f"Sending ping message {args.sendtext} to {args.destOrAll}")
payload = str.encode("test string")
print(f"Sending ping message to {args.destOrAll}")
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
wantAck=True, wantResponse=True)
@@ -259,8 +261,7 @@ def onConnected(interface):
for wrpair in (args.gpio_wrb or []):
bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0])
print(
f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
@@ -275,6 +276,7 @@ def onConnected(interface):
sys.exit(0) # Just force an exit (FIXME - ugly)
rhc.readGPIOs(args.dest, bitmask, onResponse)
time.sleep(10)
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
@@ -752,13 +754,13 @@ def initParser():
action="store_true")
parser.add_argument(
"--gpio-wrb", nargs=2, help="Set a particlar GPIO # to 1 or 0", action='append')
"--gpio-wrb", nargs=2, help="Set a particular GPIO # to 1 or 0", action='append')
parser.add_argument(
"--gpio-rd", help="Read from a GPIO mask")
"--gpio-rd", help="Read from a GPIO mask (ex: '0x10')")
parser.add_argument(
"--gpio-watch", help="Start watching a GPIO mask for changes")
"--gpio-watch", help="Start watching a GPIO mask for changes (ex: '0x10')")
parser.add_argument(
"--no-time", help="Suppress sending the current time to the mesh", action="store_true")

View File

@@ -21,10 +21,6 @@ from . import portnums_pb2, mesh_pb2
from .util import stripnl, Timeout, our_exit
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
defaultHopLimit = 3
class MeshInterface:
"""Interface class for meshtastic devices
@@ -56,6 +52,7 @@ class MeshInterface:
self.currentPacketId = random.randint(0, 0xffffffff)
self.nodesByNum = None
self.configId = None
self.defaultHopLimit = 3
def close(self):
"""Shutdown this interface"""
@@ -162,7 +159,7 @@ class MeshInterface:
destinationId=BROADCAST_ADDR,
wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
hopLimit=None,
onResponse=None,
channelIndex=0):
"""Send a utf8 string to some other node, if the node has a display it
@@ -184,6 +181,9 @@ class MeshInterface:
Returns the sent packet. The id field will be populated in this packet
and can be used to track future message acks/naks.
"""
if hopLimit is None:
hopLimit = self.defaultHopLimit
return self.sendData(text.encode("utf-8"), destinationId,
portNum=portnums_pb2.PortNum.TEXT_MESSAGE_APP,
wantAck=wantAck,
@@ -195,7 +195,7 @@ class MeshInterface:
def sendData(self, data, destinationId=BROADCAST_ADDR,
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
wantResponse=False,
hopLimit=defaultHopLimit,
hopLimit=None,
onResponse=None,
channelIndex=0):
"""Send a data packet to some other node
@@ -219,6 +219,9 @@ class MeshInterface:
Returns the sent packet. The id field will be populated in this packet
and can be used to track future message acks/naks.
"""
if hopLimit is None:
hopLimit = self.defaultHopLimit
if getattr(data, "SerializeToString", None):
logging.debug(f"Serializing protobuf as data: {stripnl(data)}")
data = data.SerializeToString()
@@ -280,13 +283,15 @@ class MeshInterface:
def _sendPacket(self, meshPacket,
destinationId=BROADCAST_ADDR,
wantAck=False, hopLimit=defaultHopLimit):
wantAck=False, hopLimit=None):
"""Send a MeshPacket to the specified node (or if unspecified, broadcast).
You probably don't want this - use sendData instead.
Returns the sent packet. The id field will be populated in this packet and
can be used to track future message acks/naks.
"""
if hopLimit is None:
hopLimit = self.defaultHopLimit
# We allow users to talk to the local node before we've completed the full connection flow...
if(self.myInfo is not None and destinationId != self.myInfo.my_node_num):
@@ -365,7 +370,7 @@ class MeshInterface:
def _waitConnected(self):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.isConnected.wait(10.0): # timeout after 10 seconds
if not self.isConnected.wait(15.0): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client
@@ -456,7 +461,7 @@ class MeshInterface:
Called by subclasses."""
fromRadio = mesh_pb2.FromRadio()
fromRadio.ParseFromString(fromRadioBytes)
#logging.debug(f"fromRadioBytes: {fromRadioBytes}")
logging.debug(f"in mesh_interface.py _handleFromRadio() fromRadioBytes: {fromRadioBytes}")
asDict = google.protobuf.json_format.MessageToDict(fromRadio)
logging.debug(f"Received from radio: {fromRadio}")
if fromRadio.HasField("my_info"):
@@ -491,7 +496,8 @@ class MeshInterface:
self.nodesByNum[node["num"]] = node
if "user" in node: # Some nodes might not have user/ids assigned yet
self.nodes[node["user"]["id"]] = node
if "id" in node["user"]:
self.nodes[node["user"]["id"]] = node
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.node.updated",
node=node, interface=self))
elif fromRadio.config_complete_id == self.configId:

View File

@@ -20,7 +20,7 @@ class Node:
self.nodeNum = nodeNum
self.radioConfig = None
self.channels = None
self._timeout = Timeout(maxSecs=60)
self._timeout = Timeout(maxSecs=300)
self.partialChannels = None
self.noProto = noProto
@@ -49,6 +49,7 @@ class Node:
def requestConfig(self):
"""Send regular MeshPackets to ask for settings and channels."""
logging.debug(f"requestConfig for nodeNum:{self.nodeNum}")
self.radioConfig = None
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished
@@ -134,6 +135,7 @@ class Node:
def setOwner(self, long_name=None, short_name=None, is_licensed=False, team=None):
"""Set device owner name"""
logging.debug(f"in setOwner nodeNum:{self.nodeNum}")
nChars = 3
minChars = 2
if long_name is not None:
@@ -239,7 +241,13 @@ class Node:
# Show progress message for super slow operations
if self != self.iface.localNode:
print("Requesting preferences from remote node (this could take a while)")
print("Requesting preferences from remote node.")
print("Be sure:")
print(" 1. There is a SECONDARY channel named 'admin'.")
print(" 2. The '--seturl' was used to configure.")
print(" 3. All devices have the same modem config. (i.e., '--ch-longfast')")
print(" 4. All devices have been rebooted after all of the above. (optional, but recommended)")
print("Note: This could take a while (it requests remote channel configs, then writes config)")
return self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
@@ -290,7 +298,8 @@ class Node:
# Show progress message for super slow operations
if self != self.iface.localNode:
logging.info(f"Requesting channel {channelNum} info from remote node (this could take a while)")
print(f"Requesting channel {channelNum} info from remote node (this could take a while)")
logging.debug(f"Requesting channel {channelNum} info from remote node (this could take a while)")
else:
logging.debug(f"Requesting channel {channelNum}")
@@ -322,6 +331,7 @@ class Node:
return self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
# pylint: disable=R1710
def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=False,
onResponse=None, adminIndex=0):
@@ -332,6 +342,7 @@ class Node:
else:
if adminIndex == 0: # unless a special channel index was used, we want to use the admin index
adminIndex = self.iface.localNode._getAdminChannelIndex()
logging.debug(f'adminIndex:{adminIndex}')
return self.iface.sendData(p, self.nodeNum,
portNum=portnums_pb2.PortNum.ADMIN_APP,

View File

@@ -1,5 +1,6 @@
""" Remote hardware
"""Remote hardware
"""
import logging
from pubsub import pub
from . import portnums_pb2, remote_hardware_pb2
@@ -33,8 +34,7 @@ class RemoteHardwareClient:
"to use this (secured) service (--ch-add gpio --info then --seturl)")
self.channelIndex = ch.index
pub.subscribe(
onGPIOreceive, "meshtastic.receive.remotehw")
pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw")
def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None):
if not nodeid:
@@ -48,6 +48,7 @@ class RemoteHardwareClient:
Write the specified vals bits to the device GPIOs. Only bits in mask that
are 1 will be changed
"""
logging.debug(f'writeGPIOs nodeid:{nodeid} mask:{mask} vals:{vals}')
r = remote_hardware_pb2.HardwareMessage()
r.typ = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS
r.gpio_mask = mask
@@ -56,6 +57,7 @@ class RemoteHardwareClient:
def readGPIOs(self, nodeid, mask, onResponse = None):
"""Read the specified bits from GPIO inputs on the device"""
logging.debug(f'readGPIOs nodeid:{nodeid} mask:{mask}')
r = remote_hardware_pb2.HardwareMessage()
r.typ = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS
r.gpio_mask = mask
@@ -63,6 +65,7 @@ class RemoteHardwareClient:
def watchGPIOs(self, nodeid, mask):
"""Watch the specified bits from GPIO inputs on the device for changes"""
logging.debug(f'watchGPIOs nodeid:{nodeid} mask:{mask}')
r = remote_hardware_pb2.HardwareMessage()
r.typ = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS
r.gpio_mask = mask

View File

@@ -404,7 +404,7 @@ def test_main_sendtext(capsys, reset_globals):
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
def mock_sendText(text, dest, wantAck):
def mock_sendText(text, dest, wantAck, channelIndex):
print('inside mocked sendText')
iface.sendText.side_effect = mock_sendText
@@ -425,7 +425,7 @@ def test_main_sendtext_with_dest(capsys, reset_globals):
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
def mock_sendText(text, dest, wantAck):
def mock_sendText(text, dest, wantAck, channelIndex):
print('inside mocked sendText')
iface.sendText.side_effect = mock_sendText

View File

@@ -182,3 +182,37 @@ def test_handleFromRadio_with_node_info(reset_globals, caplog, capsys):
assert re.search(r'│ !28af67cc │ N/A │ N/A │ N/A', out, re.MULTILINE)
assert err == ''
iface.close()
@pytest.mark.unit
def test_handleFromRadio_with_node_info_tbeam1(reset_globals, caplog, capsys):
"""Test _handleFromRadio with node_info"""
# Note: Captured the '--debug --info' for the bytes below.
# pylint: disable=C0301
from_radio_bytes = b'"=\x08\x80\xf8\xc8\xf6\x07\x12"\n\t!7ed23c00\x12\x07TBeam 1\x1a\x02T1"\x06\x94\xb9~\xd2<\x000\x04\x1a\x07 ]MN\x01\xbea%\xad\x01\xbea=\x00\x00,A'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface._startConfig()
iface._handleFromRadio(from_radio_bytes)
assert re.search(r'Received nodeinfo', caplog.text, re.MULTILINE)
assert re.search(r'TBeam 1', caplog.text, re.MULTILINE)
assert re.search(r'2127707136', caplog.text, re.MULTILINE)
# validate some of showNodes() output
iface.showNodes()
out, err = capsys.readouterr()
assert re.search(r' 1 ', out, re.MULTILINE)
assert re.search(r'│ TBeam 1 │ ', out, re.MULTILINE)
assert re.search(r'│ !7ed23c00 │', out, re.MULTILINE)
assert err == ''
iface.close()
@pytest.mark.unit
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(reset_globals, caplog, capsys):
"""Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception"""
# Note: Captured the '--debug --info' for the bytes below.
from_radio_bytes = b'"\x17\x08\xdc\x8a\x8a\xae\x02\x12\x08"\x06\x00\x00\x00\x00\x00\x00\x1a\x00=\x00\x00\xb8@'
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.DEBUG):
iface._startConfig()
iface._handleFromRadio(from_radio_bytes)

View File

@@ -0,0 +1,85 @@
"""Meshtastic unit tests for remote_hardware.py"""
import logging
import re
from unittest.mock import patch, MagicMock
import pytest
from ..remote_hardware import RemoteHardwareClient, onGPIOreceive
from ..serial_interface import SerialInterface
@pytest.mark.unit
def test_RemoteHardwareClient():
"""Test that we can instantiate a RemoteHardwareClient instance"""
iface = MagicMock(autospec=SerialInterface)
rhw = RemoteHardwareClient(iface)
assert rhw.iface == iface
iface.close()
@pytest.mark.unit
def test_onGPIOreceive(capsys):
"""Test onGPIOreceive"""
iface = MagicMock(autospec=SerialInterface)
packet = {'decoded': {'remotehw': {'typ': 'foo', 'gpioValue': 'bar' }}}
onGPIOreceive(packet, iface)
out, err = capsys.readouterr()
assert re.search(r'Received RemoteHardware', out)
assert err == ''
@pytest.mark.unit
def test_RemoteHardwareClient_no_gpio_channel():
"""Test that we can instantiate a RemoteHardwareClient instance but cannot get channel gpio"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
with pytest.raises(Exception) as pytest_wrapped_e:
RemoteHardwareClient(mo)
assert pytest_wrapped_e.type == Exception
@pytest.mark.unit
def test_readGPIOs(caplog):
"""Test readGPIOs"""
iface = MagicMock(autospec=SerialInterface)
rhw = RemoteHardwareClient(iface)
with caplog.at_level(logging.DEBUG):
rhw.readGPIOs('0x10', 123)
assert re.search(r'readGPIOs', caplog.text, re.MULTILINE)
iface.close()
@pytest.mark.unit
def test_writeGPIOs(caplog):
"""Test writeGPIOs"""
iface = MagicMock(autospec=SerialInterface)
rhw = RemoteHardwareClient(iface)
with caplog.at_level(logging.DEBUG):
rhw.writeGPIOs('0x10', 123, 1)
assert re.search(r'writeGPIOs', caplog.text, re.MULTILINE)
iface.close()
@pytest.mark.unit
def test_watchGPIOs(caplog):
"""Test watchGPIOs"""
iface = MagicMock(autospec=SerialInterface)
rhw = RemoteHardwareClient(iface)
with caplog.at_level(logging.DEBUG):
rhw.watchGPIOs('0x10', 123)
assert re.search(r'watchGPIOs', caplog.text, re.MULTILINE)
iface.close()
@pytest.mark.unit
def test_sendHardware_no_nodeid():
"""Test sending no nodeid to _sendHardware()"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with pytest.raises(Exception) as pytest_wrapped_e:
rhw = RemoteHardwareClient(mo)
rhw._sendHardware(None, None)
assert pytest_wrapped_e.type == Exception