lock down hardware access

This commit is contained in:
Kevin Hester
2021-03-13 12:40:31 +08:00
parent aae2a03b1b
commit b0f6db56bd

View File

@@ -2,14 +2,16 @@
from . import portnums_pb2, remote_hardware_pb2
from pubsub import pub
def onGPIOreceive(packet, interface):
"""Callback for received GPIO responses
FIXME figure out how to do closures with methods in python"""
pb = remote_hardware_pb2.HardwareMessage()
pb.ParseFromString(packet["decoded"]["data"]["payload"])
print(f"Received RemoteHardware typ={pb.typ}, gpio_value={pb.gpio_value}")
class RemoteHardwareClient:
"""
This is the client code to control/monitor simple hardware built into the
@@ -24,8 +26,17 @@ class RemoteHardwareClient:
iface is the already open MeshInterface instance
"""
self.iface = iface
ch = iface.localNode.getChannelByName("gpio")
if not ch:
raise Exception(
"No gpio channel found, please create before using this (secured) service")
self.channelIndex = ch.index
pub.subscribe(onGPIOreceive, "meshtastic.receive.data.REMOTE_HARDWARE_APP")
pub.subscribe(
onGPIOreceive, "meshtastic.receive.data.REMOTE_HARDWARE_APP")
def _sendHardware(self, nodeid, r):
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP, wantAck=True, channelIndex=self.channelIndex)
def writeGPIOs(self, nodeid, mask, vals):
"""
@@ -36,18 +47,18 @@ class RemoteHardwareClient:
r.typ = remote_hardware_pb2.HardwareMessage.Type.WRITE_GPIOS
r.gpio_mask = mask
r.gpio_value = vals
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP, wantAck = True)
return self._sendHardware(nodeid, r)
def readGPIOs(self, nodeid, mask):
"""Read the specified bits from GPIO inputs on the device"""
r = remote_hardware_pb2.HardwareMessage()
r.typ = remote_hardware_pb2.HardwareMessage.Type.READ_GPIOS
r.gpio_mask = mask
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP, wantAck = True)
return self._sendHardware(nodeid, r)
def watchGPIOs(self, nodeid, mask):
"""Watch the specified bits from GPIO inputs on the device for changes"""
r = remote_hardware_pb2.HardwareMessage()
r.typ = remote_hardware_pb2.HardwareMessage.Type.WATCH_GPIOS
r.gpio_mask = mask
return self.iface.sendData(r, nodeid, portnums_pb2.REMOTE_HARDWARE_APP, wantAck = True)
return self._sendHardware(nodeid, r)