mirror of
https://github.com/meshtastic/python.git
synced 2025-12-30 03:17:54 -05:00
add unit tests for RemoteHardwareClient()
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
""" Remote hardware
|
||||
"""Remote hardware
|
||||
"""
|
||||
import logging
|
||||
from pubsub import pub
|
||||
from . import portnums_pb2, remote_hardware_pb2
|
||||
|
||||
@@ -33,8 +34,7 @@ class RemoteHardwareClient:
|
||||
"to use this (secured) service (--ch-add gpio --info then --seturl)")
|
||||
self.channelIndex = ch.index
|
||||
|
||||
pub.subscribe(
|
||||
onGPIOreceive, "meshtastic.receive.remotehw")
|
||||
pub.subscribe(onGPIOreceive, "meshtastic.receive.remotehw")
|
||||
|
||||
def _sendHardware(self, nodeid, r, wantResponse=False, onResponse=None):
|
||||
if not nodeid:
|
||||
@@ -48,6 +48,7 @@ class RemoteHardwareClient:
|
||||
Write the specified vals bits to the device GPIOs. Only bits in mask that
|
||||
are 1 will be changed
|
||||
"""
|
||||
logging.debug(f'writeGPIOs nodeid:{nodeid} mask:{mask} vals:{vals}')
|
||||
r = remote_hardware_pb2.HardwareMessage()
|
||||
r.typ = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS
|
||||
r.gpio_mask = mask
|
||||
@@ -56,6 +57,7 @@ class RemoteHardwareClient:
|
||||
|
||||
def readGPIOs(self, nodeid, mask, onResponse = None):
|
||||
"""Read the specified bits from GPIO inputs on the device"""
|
||||
logging.debug(f'readGPIOs nodeid:{nodeid} mask:{mask}')
|
||||
r = remote_hardware_pb2.HardwareMessage()
|
||||
r.typ = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS
|
||||
r.gpio_mask = mask
|
||||
@@ -63,6 +65,7 @@ class RemoteHardwareClient:
|
||||
|
||||
def watchGPIOs(self, nodeid, mask):
|
||||
"""Watch the specified bits from GPIO inputs on the device for changes"""
|
||||
logging.debug(f'watchGPIOs nodeid:{nodeid} mask:{mask}')
|
||||
r = remote_hardware_pb2.HardwareMessage()
|
||||
r.typ = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS
|
||||
r.gpio_mask = mask
|
||||
|
||||
85
meshtastic/tests/test_remote_hardware.py
Normal file
85
meshtastic/tests/test_remote_hardware.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Meshtastic unit tests for remote_hardware.py"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from unittest.mock import patch, MagicMock
|
||||
import pytest
|
||||
|
||||
from ..remote_hardware import RemoteHardwareClient, onGPIOreceive
|
||||
from ..serial_interface import SerialInterface
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_RemoteHardwareClient():
|
||||
"""Test that we can instantiate a RemoteHardwareClient instance"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
rhw = RemoteHardwareClient(iface)
|
||||
assert rhw.iface == iface
|
||||
iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_onGPIOreceive(capsys):
|
||||
"""Test onGPIOreceive"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {'decoded': {'remotehw': {'typ': 'foo', 'gpioValue': 'bar' }}}
|
||||
onGPIOreceive(packet, iface)
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Received RemoteHardware', out)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_RemoteHardwareClient_no_gpio_channel():
|
||||
"""Test that we can instantiate a RemoteHardwareClient instance but cannot get channel gpio"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
RemoteHardwareClient(mo)
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_readGPIOs(caplog):
|
||||
"""Test readGPIOs"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
rhw = RemoteHardwareClient(iface)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
rhw.readGPIOs('0x10', 123)
|
||||
assert re.search(r'readGPIOs', caplog.text, re.MULTILINE)
|
||||
iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_writeGPIOs(caplog):
|
||||
"""Test writeGPIOs"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
rhw = RemoteHardwareClient(iface)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
rhw.writeGPIOs('0x10', 123, 1)
|
||||
assert re.search(r'writeGPIOs', caplog.text, re.MULTILINE)
|
||||
iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_watchGPIOs(caplog):
|
||||
"""Test watchGPIOs"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
rhw = RemoteHardwareClient(iface)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
rhw.watchGPIOs('0x10', 123)
|
||||
assert re.search(r'watchGPIOs', caplog.text, re.MULTILINE)
|
||||
iface.close()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendHardware_no_nodeid():
|
||||
"""Test sending no nodeid to _sendHardware()"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
rhw = RemoteHardwareClient(mo)
|
||||
rhw._sendHardware(None, None)
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
Reference in New Issue
Block a user