got gpio read to work and one unit test

This commit is contained in:
Mike Kinney
2021-12-23 14:35:48 -08:00
parent 942600f399
commit fee052892b
6 changed files with 139 additions and 30 deletions

View File

@@ -273,15 +273,14 @@ def onConnected(interface):
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
def onResponse(packet):
"""A closure to handle the response packet"""
hw = packet["decoded"]["remotehw"]
print(f'GPIO read response gpio_value={hw["gpioValue"]}')
sys.exit(0) # Just force an exit (FIXME - ugly)
rhc.readGPIOs(args.dest, bitmask, onResponse)
time.sleep(10)
rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto:
# wait up to X seconds for a response
for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f'end of gpio_rd')
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)

View File

@@ -53,6 +53,7 @@ class MeshInterface:
self.nodesByNum = None
self.configId = None
self.defaultHopLimit = 3
self.gotResponse = False
def close(self):
"""Shutdown this interface"""
@@ -216,6 +217,7 @@ class MeshInterface:
onResponse -- A closure of the form funct(packet), that will be
called when a response packet arrives (or the transaction
is NAKed due to non receipt)
channelIndex - channel number to use
Returns the sent packet. The id field will be populated in this packet
and can be used to track future message acks/naks.
@@ -338,8 +340,11 @@ class MeshInterface:
meshPacket.id = self._generatePacketId()
toRadio.packet.CopyFrom(meshPacket)
logging.debug(f"Sending packet: {stripnl(meshPacket)}")
self._sendToRadio(toRadio)
if self.noProto:
logging.warning(f"Not sending packet because protocol use is disabled by noProto")
else:
logging.debug(f"Sending packet: {stripnl(meshPacket)}")
self._sendToRadio(toRadio)
return meshPacket
def waitForConfig(self):
@@ -583,7 +588,6 @@ class MeshInterface:
- meshtastic.receive.user(packet = MeshPacket dictionary)
- meshtastic.receive.data(packet = MeshPacket dictionary)
"""
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
# We normally decompose the payload into a dictionary so that the client

View File

@@ -3,14 +3,16 @@
import logging
from pubsub import pub
from . import portnums_pb2, remote_hardware_pb2
from .util import our_exit
def onGPIOreceive(packet, interface):
"""Callback for received GPIO responses
FIXME figure out how to do closures with methods in python"""
"""
logging.debug(f"packet:{packet} interface:{interface}")
hw = packet["decoded"]["remotehw"]
print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={hw["gpioValue"]}')
interface.gotResponse = True
class RemoteHardwareClient:
@@ -29,19 +31,21 @@ class RemoteHardwareClient:
self.iface = iface
ch = iface.localNode.getChannelByName("gpio")
if not ch:
raise Exception(
"No gpio channel found, please create on the sending and receive nodes "\
"to use this (secured) service (--ch-add gpio --info then --seturl)")
our_exit(
"Warning: No channel named 'gpio' was found.\n"\
"On the sending and receive nodes create a channel named 'gpio'.\n"\
"For example, run '--ch-add gpio' on one device, then '--seturl' on\n"\
"the other devices using the url from the device where the channel was added.")
self.channelIndex = ch.index
pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw")
def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None):
if not nodeid:
raise Exception(
r"You must set a destination node ID for this operation (use --dest \!xxxxxxxxx)")
our_exit(r"Warning: Must use a destination node ID for this operation (use --dest \!xxxxxxxxx)")
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP,
wantAck=True, channelIndex=self.channelIndex, wantResponse=wantResponse, onResponse=onResponse)
wantAck=True, channelIndex=self.channelIndex,
wantResponse=wantResponse, onResponse=onResponse)
def writeGPIOs(self, nodeid, mask, vals):
"""

View File

@@ -4,6 +4,7 @@
import sys
import os
import re
import logging
from unittest.mock import patch, MagicMock
import pytest
@@ -15,6 +16,7 @@ from ..tcp_interface import TCPInterface
from ..ble_interface import BLEInterface
from ..node import Node
from ..channel_pb2 import Channel
from ..remote_hardware import onGPIOreceive
@pytest.mark.unit
@@ -1306,3 +1308,99 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
assert re.search(r'# start of Meshtastic configure yaml', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals):
"""Test --gpio_rd with no named gpio channel"""
sys.argv = ['', '--gpio-rd', '0x10']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = None
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: No channel named', out)
assert err == ''
@pytest.mark.unit
def test_main_gpio_rd_no_dest(capsys, reset_globals):
"""Test --gpio_rd with a named gpio channel but no dest was specified"""
sys.argv = ['', '--gpio-rd', '0x2000']
Globals.getInstance().set_args(sys.argv)
channel = Channel(index=1, role=1)
channel.settings.modem_config = 3
channel.settings.psk = b'\x01'
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: Must use a destination node ID', out)
assert err == ''
@pytest.mark.unit
def test_main_gpio_rd(caplog, capsys, reset_globals):
"""Test --gpio_rd with a named gpio channel"""
# Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a
# red arrow (meaning ok to use for our purposes)
# See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf
# To find out the mask for GPIO 13, let us assign n as 13.
# 1. Subtract 1 from n (n is now 12)
# 2. Find the 2^n or 2^12 (4096)
# 3. Convert 4096 decimal to hex (0x1000)
# You can use python:
# >>> print(hex(2**12))
# 0x1000
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
Globals.getInstance().set_args(sys.argv)
channel = Channel(index=1, role=1)
channel.settings.modem_config = 3
channel.settings.psk = b'\x01'
packet = {
'from': 682968668,
'to': 682968612,
'channel': 1,
'decoded': {
'portnum': 'REMOTE_HARDWARE_APP',
'payload': b'\x08\x05\x18\x80 ',
'requestId': 1629980484,
'remotehw': {
'typ': 'READ_GPIOS_REPLY',
'gpioValue': '4096',
'raw': 'faked',
'id': 1693085229,
'rxTime': 1640294262,
'rxSnr': 4.75,
'hopLimit': 3,
'wantAck': True,
}
}
}
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with caplog.at_level(logging.DEBUG):
main()
onGPIOreceive(packet, mo)
assert re.search(r'readGPIOs nodeid:!1234 mask:4096', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE)
assert err == ''

View File

@@ -359,7 +359,7 @@ def test_sendPacket_with_destination_as_int(caplog, reset_globals):
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=123)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
@@ -369,7 +369,7 @@ def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals)
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId='!1234')
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
@@ -379,7 +379,7 @@ def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals):
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit
@@ -405,7 +405,7 @@ def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_glo
with caplog.at_level(logging.DEBUG):
meshPacket = mesh_pb2.MeshPacket()
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unit

View File

@@ -31,14 +31,18 @@ def test_onGPIOreceive(capsys):
@pytest.mark.unit
def test_RemoteHardwareClient_no_gpio_channel():
"""Test that we can instantiate a RemoteHardwareClient instance but cannot get channel gpio"""
def test_RemoteHardwareClient_no_gpio_channel(capsys):
"""Test that we can instantiate a RemoteHardwareClient instance but there is no channel named channel 'gpio'"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.localNode.getChannelByName.return_value = None
with pytest.raises(Exception) as pytest_wrapped_e:
with pytest.raises(SystemExit) as pytest_wrapped_e:
RemoteHardwareClient(mo)
assert pytest_wrapped_e.type == Exception
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: No channel named', out)
assert err == ""
@pytest.mark.unit
@@ -79,7 +83,7 @@ def test_sendHardware_no_nodeid():
"""Test sending no nodeid to _sendHardware()"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with pytest.raises(Exception) as pytest_wrapped_e:
with pytest.raises(SystemExit) as pytest_wrapped_e:
rhw = RemoteHardwareClient(mo)
rhw._sendHardware(None, None)
assert pytest_wrapped_e.type == Exception
assert pytest_wrapped_e.type == SystemExit