diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 353a05d..f0813b0 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -301,12 +301,11 @@ def onConnected(interface): print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}") interface.mask = bitmask rhc.readGPIOs(args.dest, bitmask, None) - if not interface.noProto: - # wait up to X seconds for a response - for _ in range(10): - time.sleep(1) - if interface.gotResponse: - break + # wait up to X seconds for a response + for _ in range(10): + time.sleep(1) + if interface.gotResponse: + break logging.debug(f'end of gpio_rd') if args.gpio_watch: diff --git a/meshtastic/remote_hardware.py b/meshtastic/remote_hardware.py index 832dd63..07ad96b 100644 --- a/meshtastic/remote_hardware.py +++ b/meshtastic/remote_hardware.py @@ -10,8 +10,17 @@ def onGPIOreceive(packet, interface): """Callback for received GPIO responses """ logging.debug(f"packet:{packet} interface:{interface}") + gpioValue = 0 hw = packet["decoded"]["remotehw"] - gpioValue = hw["gpioValue"] + if "gpioValue" in hw: + gpioValue = hw["gpioValue"] + else: + if "gpioMask" in hw: + # we did get a reply, but due to protobufs, 0 for numeric value is not sent + # see https://developers.google.com/protocol-buffers/docs/proto3#default + # so, we set it here + gpioValue = 0 + #print(f'mask:{interface.mask}') value = int(gpioValue) & int(interface.mask) print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={gpioValue} value={value}') diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index ce01da5..f4bfbcc 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -1755,18 +1755,18 @@ def test_main_gpio_rd_no_dest(capsys): @pytest.mark.unit @pytest.mark.usefixtures("reset_globals") +@patch('time.sleep') def test_main_gpio_rd(caplog, capsys): """Test --gpio_rd with a named gpio channel""" # Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a # red arrow (meaning ok to use for our purposes) # See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf # To find out the mask for GPIO 13, let us assign n as 13. - # 1. Subtract 1 from n (n is now 12) - # 2. Find the 2^n or 2^12 (4096) - # 3. Convert 4096 decimal to hex (0x1000) + # 1. Find the 2^n or 2^13 (8192) + # 2. Convert 8192 decimal to hex (0x2000) # You can use python: - # >>> print(hex(2**12)) - # 0x1000 + # >>> print(hex(2**13)) + # 0x2000 sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234'] Globals.getInstance().set_args(sys.argv) @@ -1796,7 +1796,6 @@ def test_main_gpio_rd(caplog, capsys): } } - iface = MagicMock(autospec=SerialInterface) iface.localNode.getChannelByName.return_value = channel with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: @@ -1811,6 +1810,105 @@ def test_main_gpio_rd(caplog, capsys): assert err == '' +@pytest.mark.unit +@pytest.mark.usefixtures("reset_globals") +def test_main_gpio_watch(caplog, capsys): + """Test --gpio_watch with a named gpio channel""" + sys.argv = ['', '--gpio-watch', '0x1000', '--dest', '!1234'] + Globals.getInstance().set_args(sys.argv) + + def my_sleep(amount): + print(f'{amount}') + sys.exit(3) + + channel = Channel(index=1, role=1) + channel.settings.modem_config = 3 + channel.settings.psk = b'\x01' + + packet = { + + 'from': 682968668, + 'to': 682968612, + 'channel': 1, + 'decoded': { + 'portnum': 'REMOTE_HARDWARE_APP', + 'payload': b'\x08\x05\x18\x80 ', + 'requestId': 1629980484, + 'remotehw': { + 'typ': 'READ_GPIOS_REPLY', + 'gpioValue': '4096', + 'raw': 'faked', + 'id': 1693085229, + 'rxTime': 1640294262, + 'rxSnr': 4.75, + 'hopLimit': 3, + 'wantAck': True, + } + } + } + + with patch('time.sleep', side_effect=my_sleep): + with pytest.raises(SystemExit) as pytest_wrapped_e: + iface = MagicMock(autospec=SerialInterface) + iface.localNode.getChannelByName.return_value = channel + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with caplog.at_level(logging.DEBUG): + main() + onGPIOreceive(packet, mo) + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 3 + out, err = capsys.readouterr() + assert re.search(r'Connected to radio', out, re.MULTILINE) + assert re.search(r'Watching GPIO mask 0x1000 ', out, re.MULTILINE) + assert err == '' + + +@pytest.mark.unit +@pytest.mark.usefixtures("reset_globals") +def test_main_gpio_wrb(caplog, capsys): + """Test --gpio_wrb with a named gpio channel""" + sys.argv = ['', '--gpio-wrb', '4', '1', '--dest', '!1234'] + Globals.getInstance().set_args(sys.argv) + + channel = Channel(index=1, role=1) + channel.settings.modem_config = 3 + channel.settings.psk = b'\x01' + + packet = { + + 'from': 682968668, + 'to': 682968612, + 'channel': 1, + 'decoded': { + 'portnum': 'REMOTE_HARDWARE_APP', + 'payload': b'\x08\x05\x18\x80 ', + 'requestId': 1629980484, + 'remotehw': { + 'typ': 'READ_GPIOS_REPLY', + 'gpioValue': '16', + 'raw': 'faked', + 'id': 1693085229, + 'rxTime': 1640294262, + 'rxSnr': 4.75, + 'hopLimit': 3, + 'wantAck': True, + } + } + } + + + iface = MagicMock(autospec=SerialInterface) + iface.localNode.getChannelByName.return_value = channel + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with caplog.at_level(logging.DEBUG): + main() + onGPIOreceive(packet, mo) + out, err = capsys.readouterr() + assert re.search(r'Connected to radio', out, re.MULTILINE) + assert re.search(r'Writing GPIO mask 0x10 with value 0x10 to !1234', out, re.MULTILINE) + assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=16 value=0', out, re.MULTILINE) + assert err == '' + @pytest.mark.unit @pytest.mark.usefixtures("reset_globals") def test_main_getPref_valid_field(capsys):