mirror of
https://github.com/meshtastic/python.git
synced 2026-01-01 12:27:59 -05:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
069056edad | ||
|
|
19bd510975 | ||
|
|
7979efc0a1 | ||
|
|
66866a4c65 | ||
|
|
e6fb066fe5 | ||
|
|
5841979566 | ||
|
|
529f50edc6 | ||
|
|
5895e8fb4d |
@@ -301,12 +301,11 @@ def onConnected(interface):
|
|||||||
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||||
interface.mask = bitmask
|
interface.mask = bitmask
|
||||||
rhc.readGPIOs(args.dest, bitmask, None)
|
rhc.readGPIOs(args.dest, bitmask, None)
|
||||||
if not interface.noProto:
|
# wait up to X seconds for a response
|
||||||
# wait up to X seconds for a response
|
for _ in range(10):
|
||||||
for _ in range(10):
|
time.sleep(1)
|
||||||
time.sleep(1)
|
if interface.gotResponse:
|
||||||
if interface.gotResponse:
|
break
|
||||||
break
|
|
||||||
logging.debug(f'end of gpio_rd')
|
logging.debug(f'end of gpio_rd')
|
||||||
|
|
||||||
if args.gpio_watch:
|
if args.gpio_watch:
|
||||||
|
|||||||
@@ -10,8 +10,17 @@ def onGPIOreceive(packet, interface):
|
|||||||
"""Callback for received GPIO responses
|
"""Callback for received GPIO responses
|
||||||
"""
|
"""
|
||||||
logging.debug(f"packet:{packet} interface:{interface}")
|
logging.debug(f"packet:{packet} interface:{interface}")
|
||||||
|
gpioValue = 0
|
||||||
hw = packet["decoded"]["remotehw"]
|
hw = packet["decoded"]["remotehw"]
|
||||||
gpioValue = hw["gpioValue"]
|
if "gpioValue" in hw:
|
||||||
|
gpioValue = hw["gpioValue"]
|
||||||
|
else:
|
||||||
|
if not "gpioMask" in hw:
|
||||||
|
# we did get a reply, but due to protobufs, 0 for numeric value is not sent
|
||||||
|
# see https://developers.google.com/protocol-buffers/docs/proto3#default
|
||||||
|
# so, we set it here
|
||||||
|
gpioValue = 0
|
||||||
|
|
||||||
#print(f'mask:{interface.mask}')
|
#print(f'mask:{interface.mask}')
|
||||||
value = int(gpioValue) & int(interface.mask)
|
value = int(gpioValue) & int(interface.mask)
|
||||||
print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={gpioValue} value={value}')
|
print(f'Received RemoteHardware typ={hw["typ"]}, gpio_value={gpioValue} value={value}')
|
||||||
|
|||||||
@@ -1755,18 +1755,18 @@ def test_main_gpio_rd_no_dest(capsys):
|
|||||||
|
|
||||||
@pytest.mark.unit
|
@pytest.mark.unit
|
||||||
@pytest.mark.usefixtures("reset_globals")
|
@pytest.mark.usefixtures("reset_globals")
|
||||||
|
@patch('time.sleep')
|
||||||
def test_main_gpio_rd(caplog, capsys):
|
def test_main_gpio_rd(caplog, capsys):
|
||||||
"""Test --gpio_rd with a named gpio channel"""
|
"""Test --gpio_rd with a named gpio channel"""
|
||||||
# Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a
|
# Note: On the Heltec v2.1, there is a GPIO pin GPIO 13 that does not have a
|
||||||
# red arrow (meaning ok to use for our purposes)
|
# red arrow (meaning ok to use for our purposes)
|
||||||
# See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf
|
# See https://resource.heltec.cn/download/WiFi_LoRa_32/WIFI_LoRa_32_V2.pdf
|
||||||
# To find out the mask for GPIO 13, let us assign n as 13.
|
# To find out the mask for GPIO 13, let us assign n as 13.
|
||||||
# 1. Subtract 1 from n (n is now 12)
|
# 1. Find the 2^n or 2^13 (8192)
|
||||||
# 2. Find the 2^n or 2^12 (4096)
|
# 2. Convert 8192 decimal to hex (0x2000)
|
||||||
# 3. Convert 4096 decimal to hex (0x1000)
|
|
||||||
# You can use python:
|
# You can use python:
|
||||||
# >>> print(hex(2**12))
|
# >>> print(hex(2**13))
|
||||||
# 0x1000
|
# 0x2000
|
||||||
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
|
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
|
||||||
Globals.getInstance().set_args(sys.argv)
|
Globals.getInstance().set_args(sys.argv)
|
||||||
|
|
||||||
@@ -1796,6 +1796,52 @@ def test_main_gpio_rd(caplog, capsys):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
iface = MagicMock(autospec=SerialInterface)
|
||||||
|
iface.localNode.getChannelByName.return_value = channel
|
||||||
|
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||||
|
with caplog.at_level(logging.DEBUG):
|
||||||
|
main()
|
||||||
|
onGPIOreceive(packet, mo)
|
||||||
|
out, err = capsys.readouterr()
|
||||||
|
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||||
|
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
|
||||||
|
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE)
|
||||||
|
assert err == ''
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.usefixtures("reset_globals")
|
||||||
|
@patch('time.sleep')
|
||||||
|
def test_main_gpio_rd_with_no_gpioMask(caplog, capsys):
|
||||||
|
"""Test --gpio_rd with a named gpio channel"""
|
||||||
|
sys.argv = ['', '--gpio-rd', '0x1000', '--dest', '!1234']
|
||||||
|
Globals.getInstance().set_args(sys.argv)
|
||||||
|
|
||||||
|
channel = Channel(index=1, role=1)
|
||||||
|
channel.settings.modem_config = 3
|
||||||
|
channel.settings.psk = b'\x01'
|
||||||
|
|
||||||
|
# Note: Intentionally do not have gpioValue in response as that is the
|
||||||
|
# default value
|
||||||
|
packet = {
|
||||||
|
'from': 682968668,
|
||||||
|
'to': 682968612,
|
||||||
|
'channel': 1,
|
||||||
|
'decoded': {
|
||||||
|
'portnum': 'REMOTE_HARDWARE_APP',
|
||||||
|
'payload': b'\x08\x05\x18\x80 ',
|
||||||
|
'requestId': 1629980484,
|
||||||
|
'remotehw': {
|
||||||
|
'typ': 'READ_GPIOS_REPLY',
|
||||||
|
'raw': 'faked',
|
||||||
|
'id': 1693085229,
|
||||||
|
'rxTime': 1640294262,
|
||||||
|
'rxSnr': 4.75,
|
||||||
|
'hopLimit': 3,
|
||||||
|
'wantAck': True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
iface = MagicMock(autospec=SerialInterface)
|
iface = MagicMock(autospec=SerialInterface)
|
||||||
iface.localNode.getChannelByName.return_value = channel
|
iface.localNode.getChannelByName.return_value = channel
|
||||||
@@ -1803,14 +1849,112 @@ def test_main_gpio_rd(caplog, capsys):
|
|||||||
with caplog.at_level(logging.DEBUG):
|
with caplog.at_level(logging.DEBUG):
|
||||||
main()
|
main()
|
||||||
onGPIOreceive(packet, mo)
|
onGPIOreceive(packet, mo)
|
||||||
assert re.search(r'readGPIOs nodeid:!1234 mask:4096', caplog.text, re.MULTILINE)
|
|
||||||
out, err = capsys.readouterr()
|
out, err = capsys.readouterr()
|
||||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||||
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
|
assert re.search(r'Reading GPIO mask 0x1000 ', out, re.MULTILINE)
|
||||||
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=4096', out, re.MULTILINE)
|
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=0', out, re.MULTILINE)
|
||||||
assert err == ''
|
assert err == ''
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.usefixtures("reset_globals")
|
||||||
|
def test_main_gpio_watch(caplog, capsys):
|
||||||
|
"""Test --gpio_watch with a named gpio channel"""
|
||||||
|
sys.argv = ['', '--gpio-watch', '0x1000', '--dest', '!1234']
|
||||||
|
Globals.getInstance().set_args(sys.argv)
|
||||||
|
|
||||||
|
def my_sleep(amount):
|
||||||
|
print(f'{amount}')
|
||||||
|
sys.exit(3)
|
||||||
|
|
||||||
|
channel = Channel(index=1, role=1)
|
||||||
|
channel.settings.modem_config = 3
|
||||||
|
channel.settings.psk = b'\x01'
|
||||||
|
|
||||||
|
packet = {
|
||||||
|
|
||||||
|
'from': 682968668,
|
||||||
|
'to': 682968612,
|
||||||
|
'channel': 1,
|
||||||
|
'decoded': {
|
||||||
|
'portnum': 'REMOTE_HARDWARE_APP',
|
||||||
|
'payload': b'\x08\x05\x18\x80 ',
|
||||||
|
'requestId': 1629980484,
|
||||||
|
'remotehw': {
|
||||||
|
'typ': 'READ_GPIOS_REPLY',
|
||||||
|
'gpioValue': '4096',
|
||||||
|
'raw': 'faked',
|
||||||
|
'id': 1693085229,
|
||||||
|
'rxTime': 1640294262,
|
||||||
|
'rxSnr': 4.75,
|
||||||
|
'hopLimit': 3,
|
||||||
|
'wantAck': True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
with patch('time.sleep', side_effect=my_sleep):
|
||||||
|
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||||
|
iface = MagicMock(autospec=SerialInterface)
|
||||||
|
iface.localNode.getChannelByName.return_value = channel
|
||||||
|
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||||
|
with caplog.at_level(logging.DEBUG):
|
||||||
|
main()
|
||||||
|
onGPIOreceive(packet, mo)
|
||||||
|
assert pytest_wrapped_e.type == SystemExit
|
||||||
|
assert pytest_wrapped_e.value.code == 3
|
||||||
|
out, err = capsys.readouterr()
|
||||||
|
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||||
|
assert re.search(r'Watching GPIO mask 0x1000 ', out, re.MULTILINE)
|
||||||
|
assert err == ''
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
@pytest.mark.usefixtures("reset_globals")
|
||||||
|
def test_main_gpio_wrb(caplog, capsys):
|
||||||
|
"""Test --gpio_wrb with a named gpio channel"""
|
||||||
|
sys.argv = ['', '--gpio-wrb', '4', '1', '--dest', '!1234']
|
||||||
|
Globals.getInstance().set_args(sys.argv)
|
||||||
|
|
||||||
|
channel = Channel(index=1, role=1)
|
||||||
|
channel.settings.modem_config = 3
|
||||||
|
channel.settings.psk = b'\x01'
|
||||||
|
|
||||||
|
packet = {
|
||||||
|
|
||||||
|
'from': 682968668,
|
||||||
|
'to': 682968612,
|
||||||
|
'channel': 1,
|
||||||
|
'decoded': {
|
||||||
|
'portnum': 'REMOTE_HARDWARE_APP',
|
||||||
|
'payload': b'\x08\x05\x18\x80 ',
|
||||||
|
'requestId': 1629980484,
|
||||||
|
'remotehw': {
|
||||||
|
'typ': 'READ_GPIOS_REPLY',
|
||||||
|
'gpioValue': '16',
|
||||||
|
'raw': 'faked',
|
||||||
|
'id': 1693085229,
|
||||||
|
'rxTime': 1640294262,
|
||||||
|
'rxSnr': 4.75,
|
||||||
|
'hopLimit': 3,
|
||||||
|
'wantAck': True,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
iface = MagicMock(autospec=SerialInterface)
|
||||||
|
iface.localNode.getChannelByName.return_value = channel
|
||||||
|
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||||
|
with caplog.at_level(logging.DEBUG):
|
||||||
|
main()
|
||||||
|
onGPIOreceive(packet, mo)
|
||||||
|
out, err = capsys.readouterr()
|
||||||
|
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||||
|
assert re.search(r'Writing GPIO mask 0x10 with value 0x10 to !1234', out, re.MULTILINE)
|
||||||
|
assert re.search(r'Received RemoteHardware typ=READ_GPIOS_REPLY, gpio_value=16 value=0', out, re.MULTILINE)
|
||||||
|
assert err == ''
|
||||||
|
|
||||||
@pytest.mark.unit
|
@pytest.mark.unit
|
||||||
@pytest.mark.usefixtures("reset_globals")
|
@pytest.mark.usefixtures("reset_globals")
|
||||||
def test_main_getPref_valid_field(capsys):
|
def test_main_getPref_valid_field(capsys):
|
||||||
|
|||||||
2
proto
2
proto
Submodule proto updated: 785fb20a0d...07ed86d8b4
2
setup.py
2
setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
|
|||||||
# This call to setup() does all the work
|
# This call to setup() does all the work
|
||||||
setup(
|
setup(
|
||||||
name="meshtastic",
|
name="meshtastic",
|
||||||
version="1.2.76",
|
version="1.2.77",
|
||||||
description="Python API & client shell for talking to Meshtastic devices",
|
description="Python API & client shell for talking to Meshtastic devices",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
long_description_content_type="text/markdown",
|
long_description_content_type="text/markdown",
|
||||||
|
|||||||
Reference in New Issue
Block a user