mirror of
https://github.com/meshtastic/python.git
synced 2025-12-31 03:47:55 -05:00
65 lines
2.3 KiB
Python
65 lines
2.3 KiB
Python
|
|
from . import portnums_pb2, remote_hardware_pb2
|
|
from pubsub import pub
|
|
|
|
|
|
def onGPIOreceive(packet, interface):
|
|
"""Callback for received GPIO responses
|
|
|
|
FIXME figure out how to do closures with methods in python"""
|
|
pb = remote_hardware_pb2.HardwareMessage()
|
|
pb.ParseFromString(packet["decoded"]["data"]["payload"])
|
|
print(f"Received RemoteHardware typ={pb.typ}, gpio_value={pb.gpio_value}")
|
|
|
|
|
|
class RemoteHardwareClient:
|
|
"""
|
|
This is the client code to control/monitor simple hardware built into the
|
|
meshtastic devices. It is intended to be both a useful API/service and example
|
|
code for how you can connect to your own custom meshtastic services
|
|
"""
|
|
|
|
def __init__(self, iface):
|
|
"""
|
|
Constructor
|
|
|
|
iface is the already open MeshInterface instance
|
|
"""
|
|
self.iface = iface
|
|
ch = iface.localNode.getChannelByName("gpio")
|
|
if not ch:
|
|
raise Exception(
|
|
"No gpio channel found, please create before using this (secured) service")
|
|
self.channelIndex = ch.index
|
|
|
|
pub.subscribe(
|
|
onGPIOreceive, "meshtastic.receive.data.REMOTE_HARDWARE_APP")
|
|
|
|
def _sendHardware(self, nodeid, r):
|
|
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP, wantAck=True, channelIndex=self.channelIndex)
|
|
|
|
def writeGPIOs(self, nodeid, mask, vals):
|
|
"""
|
|
Write the specified vals bits to the device GPIOs. Only bits in mask that
|
|
are 1 will be changed
|
|
"""
|
|
r = remote_hardware_pb2.HardwareMessage()
|
|
r.typ = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS
|
|
r.gpio_mask = mask
|
|
r.gpio_value = vals
|
|
return self._sendHardware(nodeid, r)
|
|
|
|
def readGPIOs(self, nodeid, mask):
|
|
"""Read the specified bits from GPIO inputs on the device"""
|
|
r = remote_hardware_pb2.HardwareMessage()
|
|
r.typ = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS
|
|
r.gpio_mask = mask
|
|
return self._sendHardware(nodeid, r)
|
|
|
|
def watchGPIOs(self, nodeid, mask):
|
|
"""Watch the specified bits from GPIO inputs on the device for changes"""
|
|
r = remote_hardware_pb2.HardwareMessage()
|
|
r.typ = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS
|
|
r.gpio_mask = mask
|
|
return self._sendHardware(nodeid, r)
|