diff --git a/meshtastic/remote_hardware.py b/meshtastic/remote_hardware.py index 5fa9f76..58159b0 100644 --- a/meshtastic/remote_hardware.py +++ b/meshtastic/remote_hardware.py @@ -1,5 +1,6 @@ -""" Remote hardware +"""Remote hardware """ +import logging from pubsub import pub from . import portnums_pb2, remote_hardware_pb2 @@ -33,8 +34,7 @@ class RemoteHardwareClient: "to use this (secured) service (--ch-add gpio --info then --seturl)") self.channelIndex = ch.index - pub.subscribe( - onGPIOreceive, "meshtastic.receive.remotehw") + pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw") def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None): if not nodeid: @@ -48,6 +48,7 @@ class RemoteHardwareClient: Write the specified vals bits to the device GPIOs. Only bits in mask that are 1 will be changed """ + logging.debug(f'writeGPIOs nodeid:{nodeid} mask:{mask} vals:{vals}') r = remote_hardware_pb2.HardwareMessage() r.typ = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS r.gpio_mask = mask @@ -56,6 +57,7 @@ class RemoteHardwareClient: def readGPIOs(self, nodeid, mask, onResponse = None): """Read the specified bits from GPIO inputs on the device""" + logging.debug(f'readGPIOs nodeid:{nodeid} mask:{mask}') r = remote_hardware_pb2.HardwareMessage() r.typ = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS r.gpio_mask = mask @@ -63,6 +65,7 @@ class RemoteHardwareClient: def watchGPIOs(self, nodeid, mask): """Watch the specified bits from GPIO inputs on the device for changes""" + logging.debug(f'watchGPIOs nodeid:{nodeid} mask:{mask}') r = remote_hardware_pb2.HardwareMessage() r.typ = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS r.gpio_mask = mask diff --git a/meshtastic/tests/test_remote_hardware.py b/meshtastic/tests/test_remote_hardware.py new file mode 100644 index 0000000..4f738ef --- /dev/null +++ b/meshtastic/tests/test_remote_hardware.py @@ -0,0 +1,85 @@ +"""Meshtastic unit tests for remote_hardware.py""" + +import logging +import re + +from unittest.mock import patch, MagicMock +import pytest + +from ..remote_hardware import RemoteHardwareClient, onGPIOreceive +from ..serial_interface import SerialInterface + + +@pytest.mark.unit +def test_RemoteHardwareClient(): + """Test that we can instantiate a RemoteHardwareClient instance""" + iface = MagicMock(autospec=SerialInterface) + rhw = RemoteHardwareClient(iface) + assert rhw.iface == iface + iface.close() + + +@pytest.mark.unit +def test_onGPIOreceive(capsys): + """Test onGPIOreceive""" + iface = MagicMock(autospec=SerialInterface) + packet = {'decoded': {'remotehw': {'typ': 'foo', 'gpioValue': 'bar' }}} + onGPIOreceive(packet, iface) + out, err = capsys.readouterr() + assert re.search(r'Received RemoteHardware', out) + assert err == '' + + +@pytest.mark.unit +def test_RemoteHardwareClient_no_gpio_channel(): + """Test that we can instantiate a RemoteHardwareClient instance but cannot get channel gpio""" + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + mo.localNode.getChannelByName.return_value = None + with pytest.raises(Exception) as pytest_wrapped_e: + RemoteHardwareClient(mo) + assert pytest_wrapped_e.type == Exception + + +@pytest.mark.unit +def test_readGPIOs(caplog): + """Test readGPIOs""" + iface = MagicMock(autospec=SerialInterface) + rhw = RemoteHardwareClient(iface) + with caplog.at_level(logging.DEBUG): + rhw.readGPIOs('0x10', 123) + assert re.search(r'readGPIOs', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_writeGPIOs(caplog): + """Test writeGPIOs""" + iface = MagicMock(autospec=SerialInterface) + rhw = RemoteHardwareClient(iface) + with caplog.at_level(logging.DEBUG): + rhw.writeGPIOs('0x10', 123, 1) + assert re.search(r'writeGPIOs', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_watchGPIOs(caplog): + """Test watchGPIOs""" + iface = MagicMock(autospec=SerialInterface) + rhw = RemoteHardwareClient(iface) + with caplog.at_level(logging.DEBUG): + rhw.watchGPIOs('0x10', 123) + assert re.search(r'watchGPIOs', caplog.text, re.MULTILINE) + iface.close() + + +@pytest.mark.unit +def test_sendHardware_no_nodeid(): + """Test sending no nodeid to _sendHardware()""" + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + with pytest.raises(Exception) as pytest_wrapped_e: + rhw = RemoteHardwareClient(mo) + rhw._sendHardware(None, None) + assert pytest_wrapped_e.type == Exception