use protocol decoders for all message types (even custom ones)

This commit is contained in:
Kevin Hester
2021-02-26 18:44:02 +08:00
parent f30f97d949
commit 59517b4b49

View File

@@ -101,12 +101,46 @@ class KnownProtocol(NamedTuple):
"""Used to automatically decode known protocol payloads"""
name: str
# portnum: int, now a key
protobufFactory: Callable
protobufFactory: Callable = None # If set, will be called to prase as a protocol buffer
onReceive: Callable = None # If set, invoked as onReceive(interface, packet)
def _onTextReceive(iface, asDict):
"""Special text auto parsing for received messages"""
# We don't throw if the utf8 is invalid in the text message. Instead we just don't populate
# the decoded.data.text and we log an error message. This at least allows some delivery to
# the app and the app can deal with the missing decoded representation.
#
# Usually btw this problem is caused by apps sending binary data but setting the payload type to
# text.
try:
asDict["decoded"]["text"] = meshPacket.decoded.payload.decode(
"utf-8")
except Exception as ex:
logging.error(f"Malformatted utf8 in text message: {ex}")
def _onPositionReceive(iface, asDict):
"""Special auto parsing for received messages"""
p = asDict["decoded"]["position"]
iface._fixupPosition(p)
# update node DB as needed
iface._getOrCreateByNum(asDict["from"])["position"] = p
def _onNodeInfoReceive(iface, asDict):
"""Special auto parsing for received messages"""
p = asDict["decoded"]["user"]
# decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
# update node DB as needed
n = iface._getOrCreateByNum(asDict["from"])
n["user"] = p
# We now have a node ID, make sure it is uptodate in that table
iface.nodes[p["id"]] = n
"""Well known message payloads can register decoders for automatic protobuf parsing"""
protocols = {
portnums_pb2.PortNum.POSITION_APP: KnownProtocol("position", mesh_pb2.Position),
portnums_pb2.PortNum.NODEINFO_APP: KnownProtocol("user", mesh_pb2.User),
portnums_pb2.PortNum.TEXT_MESSAGE_APP: KnownProtocol("text", onReceive=_onTextReceive),
portnums_pb2.PortNum.POSITION_APP: KnownProtocol("position", mesh_pb2.Position, _onPositionReceive),
portnums_pb2.PortNum.NODEINFO_APP: KnownProtocol("user", mesh_pb2.User, _onNodeInfoReceive),
portnums_pb2.PortNum.ADMIN_APP: KnownProtocol("admin", admin_pb2.AdminMessage),
portnums_pb2.PortNum.ENVIRONMENTAL_MEASUREMENT_APP: KnownProtocol("environmental", environmental_measurement_pb2.EnvironmentalMeasurement),
portnums_pb2.PortNum.REMOTE_HARDWARE_APP: KnownProtocol("remotehw", remote_hardware_pb2.HardwareMessage)
@@ -541,22 +575,6 @@ class MeshInterface:
topic = f"meshtastic.receive.data.{portnum}"
# For text messages, we go ahead and decode the text to ascii for our users
if portnum == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.TEXT_MESSAGE_APP):
topic = "meshtastic.receive.text"
# We don't throw if the utf8 is invalid in the text message. Instead we just don't populate
# the decoded.data.text and we log an error message. This at least allows some delivery to
# the app and the app can deal with the missing decoded representation.
#
# Usually btw this problem is caused by apps sending binary data but setting the payload type to
# text.
try:
asDict["decoded"]["text"] = meshPacket.decoded.payload.decode(
"utf-8")
except Exception as ex:
logging.error(f"Malformatted utf8 in text message: {ex}")
# decode position protobufs and update nodedb, provide decoded version as "position" in the published msg
# move the following into a 'decoders' API that clients could register?
portNumInt = meshPacket.decoded.portnum # we want portnum as an int
@@ -564,23 +582,17 @@ class MeshInterface:
p = None # The decoded protobuf as a dictionary (if we understand this message)
if handler is not None:
topic = f"meshtastic.receive.{handler.name}"
pb = handler.protobufFactory()
pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
asDict["decoded"][handler.name] = p
if portNumInt == portnums_pb2.PortNum.POSITION_APP:
self._fixupPosition(p)
# update node DB as needed
self._getOrCreateByNum(asDict["from"])["position"] = p
# Convert to protobuf if possible
if handler.protobufFactory is not None:
pb = handler.protobufFactory()
pb.ParseFromString(meshPacket.decoded.payload)
p = google.protobuf.json_format.MessageToDict(pb)
asDict["decoded"][handler.name] = p
# decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
if portNumInt == portnums_pb2.PortNum.NODEINFO_APP:
# update node DB as needed
n = self._getOrCreateByNum(asDict["from"])
n["user"] = p
# We now have a node ID, make sure it is uptodate in that table
self.nodes[p["id"]] = n
# Call specialized onReceive if necessary
if handler.onReceive is not None:
handler.onReceive(self, asDict)
logging.debug(f"Publishing topic {topic}")
catchAndIgnore(f"publishing {topic}", lambda: pub.sendMessage(