mirror of
https://github.com/meshtastic/python.git
synced 2026-01-20 13:48:16 -05:00
4
Makefile
4
Makefile
@@ -10,6 +10,10 @@ install:
|
||||
lint:
|
||||
pylint meshtastic
|
||||
|
||||
# show the slowest unit tests
|
||||
slow:
|
||||
pytest --durations=0
|
||||
|
||||
# run the coverage report and open results in a browser
|
||||
cov:
|
||||
pytest --cov-report html --cov=meshtastic
|
||||
|
||||
12
README.md
12
README.md
@@ -199,6 +199,8 @@ pytest -vv
|
||||
pytest
|
||||
# or (more verbosely)
|
||||
pytest -m unit
|
||||
# or
|
||||
make
|
||||
```
|
||||
|
||||
* To run just integration tests:
|
||||
@@ -246,4 +248,14 @@ pytest -m smokewifi meshtastic/tests/test_smoke_wifi.py::test_smokewifi_info
|
||||
pytest --cov=meshtastic
|
||||
# or if want html coverage report
|
||||
pytest --cov-report html --cov=meshtastic
|
||||
# or
|
||||
make cov
|
||||
```
|
||||
|
||||
* To see slowest unit tests, you can run:
|
||||
|
||||
```
|
||||
pytest --durations=0
|
||||
# or
|
||||
make slow
|
||||
```
|
||||
|
||||
@@ -273,20 +273,22 @@ def onConnected(interface):
|
||||
if args.gpio_rd:
|
||||
bitmask = int(args.gpio_rd, 16)
|
||||
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||
|
||||
def onResponse(packet):
|
||||
"""A closure to handle the response packet"""
|
||||
hw = packet["decoded"]["remotehw"]
|
||||
print(f'GPIO read response gpio_value={hw["gpioValue"]}')
|
||||
sys.exit(0) # Just force an exit (FIXME - ugly)
|
||||
|
||||
rhc.readGPIOs(args.dest, bitmask, onResponse)
|
||||
time.sleep(10)
|
||||
interface.mask = bitmask
|
||||
rhc.readGPIOs(args.dest, bitmask, None)
|
||||
if not interface.noProto:
|
||||
# wait up to X seconds for a response
|
||||
for _ in range(10):
|
||||
time.sleep(1)
|
||||
if interface.gotResponse:
|
||||
break
|
||||
logging.debug(f'end of gpio_rd')
|
||||
|
||||
if args.gpio_watch:
|
||||
bitmask = int(args.gpio_watch, 16)
|
||||
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||
rhc.watchGPIOs(args.dest, bitmask)
|
||||
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
|
||||
while True:
|
||||
rhc.watchGPIOs(args.dest, bitmask)
|
||||
time.sleep(1)
|
||||
|
||||
# handle settings
|
||||
if args.set:
|
||||
|
||||
@@ -53,6 +53,8 @@ class MeshInterface:
|
||||
self.nodesByNum = None
|
||||
self.configId = None
|
||||
self.defaultHopLimit = 3
|
||||
self.gotResponse = False # used in gpio read
|
||||
self.mask = None # used in gpio read and gpio watch
|
||||
|
||||
def close(self):
|
||||
"""Shutdown this interface"""
|
||||
@@ -216,6 +218,7 @@ class MeshInterface:
|
||||
onResponse -- A closure of the form funct(packet), that will be
|
||||
called when a response packet arrives (or the transaction
|
||||
is NAKed due to non receipt)
|
||||
channelIndex - channel number to use
|
||||
|
||||
Returns the sent packet. The id field will be populated in this packet
|
||||
and can be used to track future message acks/naks.
|
||||
@@ -338,8 +341,11 @@ class MeshInterface:
|
||||
meshPacket.id = self._generatePacketId()
|
||||
|
||||
toRadio.packet.CopyFrom(meshPacket)
|
||||
logging.debug(f"Sending packet: {stripnl(meshPacket)}")
|
||||
self._sendToRadio(toRadio)
|
||||
if self.noProto:
|
||||
logging.warning(f"Not sending packet because protocol use is disabled by noProto")
|
||||
else:
|
||||
logging.debug(f"Sending packet: {stripnl(meshPacket)}")
|
||||
self._sendToRadio(toRadio)
|
||||
return meshPacket
|
||||
|
||||
def waitForConfig(self):
|
||||
@@ -583,7 +589,6 @@ class MeshInterface:
|
||||
- meshtastic.receive.user(packet = MeshPacket dictionary)
|
||||
- meshtastic.receive.data(packet = MeshPacket dictionary)
|
||||
"""
|
||||
|
||||
asDict = google.protobuf.json_format.MessageToDict(meshPacket)
|
||||
|
||||
# We normally decompose the payload into a dictionary so that the client
|
||||
|
||||
@@ -3,14 +3,19 @@
|
||||
import logging
|
||||
from pubsub import pub
|
||||
from . import portnums_pb2, remote_hardware_pb2
|
||||
from .util import our_exit
|
||||
|
||||
|
||||
def onGPIOreceive(packet, interface):
|
||||
"""Callback for received GPIO responses
|
||||
|
||||
FIXME figure out how to do closures with methods in python"""
|
||||
"""
|
||||
logging.debug(f"packet:{packet} interface:{interface}")
|
||||
hw = packet["decoded"]["remotehw"]
|
||||
print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={hw["gpioValue"]}')
|
||||
gpioValue = hw["gpioValue"]
|
||||
#print(f'mask:{interface.mask}')
|
||||
value = int(gpioValue) & int(interface.mask)
|
||||
print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={gpioValue} value={value}')
|
||||
interface.gotResponse = True
|
||||
|
||||
|
||||
class RemoteHardwareClient:
|
||||
@@ -29,19 +34,21 @@ class RemoteHardwareClient:
|
||||
self.iface = iface
|
||||
ch = iface.localNode.getChannelByName("gpio")
|
||||
if not ch:
|
||||
raise Exception(
|
||||
"No gpio channel found, please create on the sending and receive nodes "\
|
||||
"to use this (secured) service (--ch-add gpio --info then --seturl)")
|
||||
our_exit(
|
||||
"Warning: No channel named 'gpio' was found.\n"\
|
||||
"On the sending and receive nodes create a channel named 'gpio'.\n"\
|
||||
"For example, run '--ch-add gpio' on one device, then '--seturl' on\n"\
|
||||
"the other devices using the url from the device where the channel was added.")
|
||||
self.channelIndex = ch.index
|
||||
|
||||
pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw")
|
||||
|
||||
def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None):
|
||||
if not nodeid:
|
||||
raise Exception(
|
||||
r"You must set a destination node ID for this operation (use --dest \!xxxxxxxxx)")
|
||||
our_exit(r"Warning: Must use a destination node ID for this operation (use --dest \!xxxxxxxxx)")
|
||||
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP,
|
||||
wantAck=True, channelIndex=self.channelIndex, wantResponse=wantResponse, onResponse=onResponse)
|
||||
wantAck=True, channelIndex=self.channelIndex,
|
||||
wantResponse=wantResponse, onResponse=onResponse)
|
||||
|
||||
def writeGPIOs(self, nodeid, mask, vals):
|
||||
"""
|
||||
@@ -69,4 +76,5 @@ class RemoteHardwareClient:
|
||||
r = remote_hardware_pb2.HardwareMessage()
|
||||
r.typ = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS
|
||||
r.gpio_mask = mask
|
||||
self.iface.mask = mask
|
||||
return self._sendHardware(nodeid, r)
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
import sys
|
||||
import os
|
||||
import re
|
||||
import logging
|
||||
|
||||
from unittest.mock import patch, MagicMock
|
||||
import pytest
|
||||
@@ -15,6 +16,7 @@ from ..tcp_interface import TCPInterface
|
||||
from ..ble_interface import BLEInterface
|
||||
from ..node import Node
|
||||
from ..channel_pb2 import Channel
|
||||
from ..remote_hardware import onGPIOreceive
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -1306,3 +1308,99 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
|
||||
assert re.search(r'# start of Meshtastic configure yaml', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals):
|
||||
"""Test --gpio_rd with no named gpio channel"""
|
||||
sys.argv = ['', '--gpio-rd', '0x10']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode.getChannelByName.return_value = None
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Warning: No channel named', out)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_gpio_rd_no_dest(capsys, reset_globals):
|
||||
"""Test --gpio_rd with a named gpio channel but no dest was specified"""
|
||||
sys.argv = ['', '--gpio-rd', '0x2000']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
channel = Channel(index=1, role=1)
|
||||
channel.settings.modem_config = 3
|
||||
channel.settings.psk = b'\x01'
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode.getChannelByName.return_value = channel
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Warning: Must use a destination node ID', out)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_gpio_rd(caplog, capsys, reset_globals):
|
||||
"""Test --gpio_rd with a named gpio channel"""
|
||||
# Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a
|
||||
# red arrow (meaning ok to use for our purposes)
|
||||
# See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf
|
||||
# To find out the mask for GPIO 13, let us assign n as 13.
|
||||
# 1. Subtract 1 from n (n is now 12)
|
||||
# 2. Find the 2^n or 2^12 (4096)
|
||||
# 3. Convert 4096 decimal to hex (0x1000)
|
||||
# You can use python:
|
||||
# >>> print(hex(2**12))
|
||||
# 0x1000
|
||||
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
channel = Channel(index=1, role=1)
|
||||
channel.settings.modem_config = 3
|
||||
channel.settings.psk = b'\x01'
|
||||
|
||||
packet = {
|
||||
'from': 682968668,
|
||||
'to': 682968612,
|
||||
'channel': 1,
|
||||
'decoded': {
|
||||
'portnum': 'REMOTE_HARDWARE_APP',
|
||||
'payload': b'\x08\x05\x18\x80 ',
|
||||
'requestId': 1629980484,
|
||||
'remotehw': {
|
||||
'typ': 'READ_GPIOS_REPLY',
|
||||
'gpioValue': '4096',
|
||||
'raw': 'faked',
|
||||
'id': 1693085229,
|
||||
'rxTime': 1640294262,
|
||||
'rxSnr': 4.75,
|
||||
'hopLimit': 3,
|
||||
'wantAck': True,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode.getChannelByName.return_value = channel
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
main()
|
||||
onGPIOreceive(packet, mo)
|
||||
assert re.search(r'readGPIOs nodeid:!1234 mask:4096', caplog.text, re.MULTILINE)
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
|
||||
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
@@ -359,7 +359,7 @@ def test_sendPacket_with_destination_as_int(caplog, reset_globals):
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
iface._sendPacket(meshPacket, destinationId=123)
|
||||
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -369,7 +369,7 @@ def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
iface._sendPacket(meshPacket, destinationId='!1234')
|
||||
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -379,7 +379,7 @@ def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals):
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
iface._sendPacket(meshPacket, destinationId=BROADCAST_ADDR)
|
||||
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -405,7 +405,7 @@ def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_glo
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
|
||||
assert re.search(r'Sending packet', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
|
||||
@@ -23,7 +23,7 @@ def test_RemoteHardwareClient():
|
||||
def test_onGPIOreceive(capsys):
|
||||
"""Test onGPIOreceive"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {'decoded': {'remotehw': {'typ': 'foo', 'gpioValue': 'bar' }}}
|
||||
packet = {'decoded': {'remotehw': {'typ': 'foo', 'gpioValue': '4096' }}}
|
||||
onGPIOreceive(packet, iface)
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Received RemoteHardware', out)
|
||||
@@ -31,14 +31,18 @@ def test_onGPIOreceive(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_RemoteHardwareClient_no_gpio_channel():
|
||||
"""Test that we can instantiate a RemoteHardwareClient instance but cannot get channel gpio"""
|
||||
def test_RemoteHardwareClient_no_gpio_channel(capsys):
|
||||
"""Test that we can instantiate a RemoteHardwareClient instance but there is no channel named channel 'gpio'"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
RemoteHardwareClient(mo)
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Warning: No channel named', out)
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -79,7 +83,7 @@ def test_sendHardware_no_nodeid():
|
||||
"""Test sending no nodeid to _sendHardware()"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
rhw = RemoteHardwareClient(mo)
|
||||
rhw._sendHardware(None, None)
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[pytest]
|
||||
|
||||
addopts = -m "not smoke1 and not smoke2 and not smokewifi and not examples"
|
||||
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples"
|
||||
|
||||
markers =
|
||||
unit: marks tests as unit tests
|
||||
|
||||
Reference in New Issue
Block a user