Compare commits

...

8 Commits

Author SHA1 Message Date
mkinney
069056edad Update setup.py 2022-01-27 17:12:50 -08:00
github-actions
19bd510975 Update protobuf submodule 2022-01-28 01:12:41 +00:00
mkinney
7979efc0a1 Merge pull request #247 from mkinney/remote_hardware
remote hardware tests
2022-01-27 17:11:37 -08:00
Mike Kinney
66866a4c65 fix logic 2022-01-27 17:09:13 -08:00
Mike Kinney
e6fb066fe5 get last couple of lines covered in remote_hardware 2022-01-27 17:05:16 -08:00
Mike Kinney
5841979566 remove line 2022-01-27 17:00:53 -08:00
Mike Kinney
529f50edc6 remote hardware tests 2022-01-27 16:54:47 -08:00
github-actions
5895e8fb4d Update protobuf submodule 2022-01-27 14:21:10 +00:00
5 changed files with 168 additions and 16 deletions

View File

@@ -301,12 +301,11 @@ def onConnected(interface):
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}") print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
interface.mask = bitmask interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None) rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto: # wait up to X seconds for a response
# wait up to X seconds for a response for _ in range(10):
for _ in range(10): time.sleep(1)
time.sleep(1) if interface.gotResponse:
if interface.gotResponse: break
break
logging.debug(f'end of gpio_rd') logging.debug(f'end of gpio_rd')
if args.gpio_watch: if args.gpio_watch:

View File

@@ -10,8 +10,17 @@ def onGPIOreceive(packet, interface):
"""Callback for received GPIO responses """Callback for received GPIO responses
""" """
logging.debug(f"packet:{packet} interface:{interface}") logging.debug(f"packet:{packet} interface:{interface}")
gpioValue = 0
hw = packet["decoded"]["remotehw"] hw = packet["decoded"]["remotehw"]
gpioValue = hw["gpioValue"] if "gpioValue" in hw:
gpioValue = hw["gpioValue"]
else:
if not "gpioMask" in hw:
# we did get a reply, but due to protobufs, 0 for numeric value is not sent
# see https://developers.google.com/protocol-buffers/docs/proto3#default
# so, we set it here
gpioValue = 0
#print(f'mask:{interface.mask}') #print(f'mask:{interface.mask}')
value = int(gpioValue) & int(interface.mask) value = int(gpioValue) & int(interface.mask)
print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={gpioValue} value={value}') print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={gpioValue} value={value}')

View File

@@ -1755,18 +1755,18 @@ def test_main_gpio_rd_no_dest(capsys):
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.usefixtures("reset_globals") @pytest.mark.usefixtures("reset_globals")
@patch('time.sleep')
def test_main_gpio_rd(caplog, capsys): def test_main_gpio_rd(caplog, capsys):
"""Test --gpio_rd with a named gpio channel""" """Test --gpio_rd with a named gpio channel"""
# Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a # Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a
# red arrow (meaning ok to use for our purposes) # red arrow (meaning ok to use for our purposes)
# See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf # See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf
# To find out the mask for GPIO 13, let us assign n as 13. # To find out the mask for GPIO 13, let us assign n as 13.
# 1. Subtract 1 from n (n is now 12) # 1. Find the 2^n or 2^13 (8192)
# 2. Find the 2^n or 2^12 (4096) # 2. Convert 8192 decimal to hex (0x2000)
# 3. Convert 4096 decimal to hex (0x1000)
# You can use python: # You can use python:
# >>> print(hex(2**12)) # >>> print(hex(2**13))
# 0x1000 # 0x2000
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234'] sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
@@ -1796,6 +1796,52 @@ def test_main_gpio_rd(caplog, capsys):
} }
} }
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with caplog.at_level(logging.DEBUG):
main()
onGPIOreceive(packet, mo)
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
@patch('time.sleep')
def test_main_gpio_rd_with_no_gpioMask(caplog, capsys):
"""Test --gpio_rd with a named gpio channel"""
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
Globals.getInstance().set_args(sys.argv)
channel = Channel(index=1, role=1)
channel.settings.modem_config = 3
channel.settings.psk = b'\x01'
# Note: Intentionally do not have gpioValue in response as that is the
# default value
packet = {
'from': 682968668,
'to': 682968612,
'channel': 1,
'decoded': {
'portnum': 'REMOTE_HARDWARE_APP',
'payload': b'\x08\x05\x18\x80 ',
'requestId': 1629980484,
'remotehw': {
'typ': 'READ_GPIOS_REPLY',
'raw': 'faked',
'id': 1693085229,
'rxTime': 1640294262,
'rxSnr': 4.75,
'hopLimit': 3,
'wantAck': True,
}
}
}
iface = MagicMock(autospec=SerialInterface) iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel iface.localNode.getChannelByName.return_value = channel
@@ -1803,14 +1849,112 @@ def test_main_gpio_rd(caplog, capsys):
with caplog.at_level(logging.DEBUG): with caplog.at_level(logging.DEBUG):
main() main()
onGPIOreceive(packet, mo) onGPIOreceive(packet, mo)
assert re.search(r'readGPIOs nodeid:!1234 mask:4096', caplog.text, re.MULTILINE)
out, err = capsys.readouterr() out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE) assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE) assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE) assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=0', out, re.MULTILINE)
assert err == '' assert err == ''
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_gpio_watch(caplog, capsys):
"""Test --gpio_watch with a named gpio channel"""
sys.argv = ['', '--gpio-watch', '0x1000', '--dest', '!1234']
Globals.getInstance().set_args(sys.argv)
def my_sleep(amount):
print(f'{amount}')
sys.exit(3)
channel = Channel(index=1, role=1)
channel.settings.modem_config = 3
channel.settings.psk = b'\x01'
packet = {
'from': 682968668,
'to': 682968612,
'channel': 1,
'decoded': {
'portnum': 'REMOTE_HARDWARE_APP',
'payload': b'\x08\x05\x18\x80 ',
'requestId': 1629980484,
'remotehw': {
'typ': 'READ_GPIOS_REPLY',
'gpioValue': '4096',
'raw': 'faked',
'id': 1693085229,
'rxTime': 1640294262,
'rxSnr': 4.75,
'hopLimit': 3,
'wantAck': True,
}
}
}
with patch('time.sleep', side_effect=my_sleep):
with pytest.raises(SystemExit) as pytest_wrapped_e:
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with caplog.at_level(logging.DEBUG):
main()
onGPIOreceive(packet, mo)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 3
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Watching GPIO mask 0x1000 ', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_gpio_wrb(caplog, capsys):
"""Test --gpio_wrb with a named gpio channel"""
sys.argv = ['', '--gpio-wrb', '4', '1', '--dest', '!1234']
Globals.getInstance().set_args(sys.argv)
channel = Channel(index=1, role=1)
channel.settings.modem_config = 3
channel.settings.psk = b'\x01'
packet = {
'from': 682968668,
'to': 682968612,
'channel': 1,
'decoded': {
'portnum': 'REMOTE_HARDWARE_APP',
'payload': b'\x08\x05\x18\x80 ',
'requestId': 1629980484,
'remotehw': {
'typ': 'READ_GPIOS_REPLY',
'gpioValue': '16',
'raw': 'faked',
'id': 1693085229,
'rxTime': 1640294262,
'rxSnr': 4.75,
'hopLimit': 3,
'wantAck': True,
}
}
}
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with caplog.at_level(logging.DEBUG):
main()
onGPIOreceive(packet, mo)
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Writing GPIO mask 0x10 with value 0x10 to !1234', out, re.MULTILINE)
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=16 value=0', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.usefixtures("reset_globals") @pytest.mark.usefixtures("reset_globals")
def test_main_getPref_valid_field(capsys): def test_main_getPref_valid_field(capsys):

2
proto

Submodule proto updated: 785fb20a0d...07ed86d8b4

View File

@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work # This call to setup() does all the work
setup( setup(
name="meshtastic", name="meshtastic",
version="1.2.76", version="1.2.77",
description="Python API & client shell for talking to Meshtastic devices", description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",