mirror of
https://github.com/meshtastic/python.git
synced 2026-04-17 13:31:43 -04:00
requestConfig was assuming that the order of fields in meshtastic.LocalConfig matches the order of enum values in AdminMessage.ConfigType. This is true for 'device', 'position', etc. but is NOT true for 'security' due to the intervening 'version' field. Look up LocalConfig fields by name, not index, to prevent this error in the future. LocalConfig.security was introduced in https://github.com/meshtastic/protobufs/pull/553
1061 lines
42 KiB
Python
1061 lines
42 KiB
Python
"""Node class
|
|
"""
|
|
|
|
import base64
|
|
import logging
|
|
import time
|
|
|
|
from typing import Optional, Union, List
|
|
|
|
from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, config_pb2, localonly_pb2, mesh_pb2, portnums_pb2
|
|
from meshtastic.util import (
|
|
Timeout,
|
|
camel_to_snake,
|
|
fromPSK,
|
|
our_exit,
|
|
pskToString,
|
|
stripnl,
|
|
message_to_json,
|
|
generate_channel_hash,
|
|
to_node_num,
|
|
flags_to_list,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class Node:
|
|
"""A model of a (local or remote) node in the mesh
|
|
|
|
Includes methods for localConfig, moduleConfig and channels
|
|
"""
|
|
|
|
def __init__(self, iface, nodeNum, noProto=False, timeout: int = 300):
|
|
"""Constructor"""
|
|
self.iface = iface
|
|
self.nodeNum = nodeNum
|
|
self.localConfig = localonly_pb2.LocalConfig()
|
|
self.moduleConfig = localonly_pb2.LocalModuleConfig()
|
|
self.channels = None
|
|
self._timeout = Timeout(maxSecs=timeout)
|
|
self.partialChannels: Optional[List] = None
|
|
self.noProto = noProto
|
|
self.cannedPluginMessage = None
|
|
self.cannedPluginMessageMessages = None
|
|
self.ringtone = None
|
|
self.ringtonePart = None
|
|
|
|
self.gotResponse = None
|
|
|
|
def __repr__(self):
|
|
r = f"Node({self.iface!r}, 0x{self.nodeNum:08x}"
|
|
if self.noProto:
|
|
r += ", noProto=True"
|
|
if self._timeout.expireTimeout != 300:
|
|
r += ", timeout={self._timeout.expireTimeout!r}"
|
|
r += ")"
|
|
return r
|
|
|
|
@staticmethod
|
|
def position_flags_list(position_flags: int) -> List[str]:
|
|
"Return a list of position flags from the given flags integer"
|
|
return flags_to_list(config_pb2.Config.PositionConfig.PositionFlags, position_flags)
|
|
|
|
@staticmethod
|
|
def excluded_modules_list(excluded_modules: int) -> List[str]:
|
|
"Return a list of excluded modules from the given flags integer"
|
|
return flags_to_list(mesh_pb2.ExcludedModules, excluded_modules)
|
|
|
|
def module_available(self, excluded_bit: int) -> bool:
|
|
"""Check DeviceMetadata.excluded_modules to see if a module is available."""
|
|
meta = getattr(self.iface, "metadata", None)
|
|
if meta is None:
|
|
return True
|
|
try:
|
|
return (meta.excluded_modules & excluded_bit) == 0
|
|
except Exception:
|
|
return True
|
|
|
|
def showChannels(self):
|
|
"""Show human readable description of our channels."""
|
|
print("Channels:")
|
|
if self.channels:
|
|
logger.debug(f"self.channels:{self.channels}")
|
|
for c in self.channels:
|
|
cStr = message_to_json(c.settings)
|
|
# don't show disabled channels
|
|
if channel_pb2.Channel.Role.Name(c.role) != "DISABLED":
|
|
print(
|
|
f" Index {c.index}: {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}"
|
|
)
|
|
publicURL = self.getURL(includeAll=False)
|
|
adminURL = self.getURL(includeAll=True)
|
|
print(f"\nPrimary channel URL: {publicURL}")
|
|
if adminURL != publicURL:
|
|
print(f"Complete URL (includes all channels): {adminURL}")
|
|
|
|
def showInfo(self):
|
|
"""Show human readable description of our node"""
|
|
prefs = ""
|
|
if self.localConfig:
|
|
prefs = message_to_json(self.localConfig, multiline=True)
|
|
print(f"Preferences: {prefs}\n")
|
|
prefs = ""
|
|
if self.moduleConfig:
|
|
prefs = message_to_json(self.moduleConfig, multiline=True)
|
|
print(f"Module preferences: {prefs}\n")
|
|
self.showChannels()
|
|
|
|
def setChannels(self, channels):
|
|
"""Set the channels for this node"""
|
|
self.channels = channels
|
|
self._fixupChannels()
|
|
|
|
def requestChannels(self, startingIndex: int = 0):
|
|
"""Send regular MeshPackets to ask channels."""
|
|
logger.debug(f"requestChannels for nodeNum:{self.nodeNum}")
|
|
# only initialize if we're starting out fresh
|
|
if startingIndex == 0:
|
|
self.channels = None
|
|
self.partialChannels = [] # We keep our channels in a temp array until finished
|
|
self._requestChannel(startingIndex)
|
|
|
|
def onResponseRequestSettings(self, p):
|
|
"""Handle the response packets for requesting settings _requestSettings()"""
|
|
logger.debug(f"onResponseRequestSetting() p:{p}")
|
|
config_values = None
|
|
if "routing" in p["decoded"]:
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
|
|
self.iface._acknowledgment.receivedNak = True
|
|
else:
|
|
self.iface._acknowledgment.receivedAck = True
|
|
print("")
|
|
adminMessage = p["decoded"]["admin"]
|
|
if "getConfigResponse" in adminMessage:
|
|
oneof = "get_config_response"
|
|
resp = adminMessage["getConfigResponse"]
|
|
field = list(resp.keys())[0]
|
|
config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(
|
|
camel_to_snake(field)
|
|
)
|
|
if config_type is not None:
|
|
config_values = getattr(self.localConfig, config_type.name)
|
|
elif "getModuleConfigResponse" in adminMessage:
|
|
oneof = "get_module_config_response"
|
|
resp = adminMessage["getModuleConfigResponse"]
|
|
field = list(resp.keys())[0]
|
|
config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(
|
|
camel_to_snake(field)
|
|
)
|
|
config_values = getattr(self.moduleConfig, config_type.name)
|
|
else:
|
|
print(
|
|
"Did not receive a valid response. Make sure to have a shared channel named 'admin'."
|
|
)
|
|
return
|
|
if config_values is not None:
|
|
raw_config = getattr(getattr(adminMessage['raw'], oneof), camel_to_snake(field))
|
|
config_values.CopyFrom(raw_config)
|
|
print(f"{str(camel_to_snake(field))}:\n{str(config_values)}")
|
|
|
|
def requestConfig(self, configType):
|
|
"""Request the config from the node via admin message"""
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onResponseRequestSettings
|
|
print("Requesting current config from remote node (this can take a while).")
|
|
p = admin_pb2.AdminMessage()
|
|
if isinstance(configType, int):
|
|
p.get_config_request = configType
|
|
|
|
else:
|
|
if configType.containing_type.name == "LocalConfig":
|
|
p.get_config_request = admin_pb2.AdminMessage.ConfigType.Value(configType.name.upper() + "_CONFIG")
|
|
else:
|
|
p.get_module_config_request = configType.index
|
|
|
|
self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
|
|
if onResponse:
|
|
self.iface.waitForAckNak()
|
|
|
|
def turnOffEncryptionOnPrimaryChannel(self):
|
|
"""Turn off encryption on primary channel."""
|
|
self.channels[0].settings.psk = fromPSK("none")
|
|
print("Writing modified channels to device")
|
|
self.writeChannel(0)
|
|
|
|
def waitForConfig(self, attribute="channels"):
|
|
"""Block until radio config is received. Returns True if config has been received."""
|
|
return self._timeout.waitForSet(self, attrs=("localConfig", attribute))
|
|
|
|
def writeConfig(self, config_name):
|
|
"""Write the current (edited) localConfig to the device"""
|
|
if self.localConfig is None:
|
|
our_exit("Error: No localConfig has been read")
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
|
|
if config_name == "device":
|
|
p.set_config.device.CopyFrom(self.localConfig.device)
|
|
elif config_name == "position":
|
|
p.set_config.position.CopyFrom(self.localConfig.position)
|
|
elif config_name == "power":
|
|
p.set_config.power.CopyFrom(self.localConfig.power)
|
|
elif config_name == "network":
|
|
p.set_config.network.CopyFrom(self.localConfig.network)
|
|
elif config_name == "display":
|
|
p.set_config.display.CopyFrom(self.localConfig.display)
|
|
elif config_name == "lora":
|
|
p.set_config.lora.CopyFrom(self.localConfig.lora)
|
|
elif config_name == "bluetooth":
|
|
p.set_config.bluetooth.CopyFrom(self.localConfig.bluetooth)
|
|
elif config_name == "security":
|
|
p.set_config.security.CopyFrom(self.localConfig.security)
|
|
elif config_name == "mqtt":
|
|
p.set_module_config.mqtt.CopyFrom(self.moduleConfig.mqtt)
|
|
elif config_name == "serial":
|
|
p.set_module_config.serial.CopyFrom(self.moduleConfig.serial)
|
|
elif config_name == "external_notification":
|
|
p.set_module_config.external_notification.CopyFrom(
|
|
self.moduleConfig.external_notification
|
|
)
|
|
elif config_name == "store_forward":
|
|
p.set_module_config.store_forward.CopyFrom(self.moduleConfig.store_forward)
|
|
elif config_name == "range_test":
|
|
p.set_module_config.range_test.CopyFrom(self.moduleConfig.range_test)
|
|
elif config_name == "telemetry":
|
|
p.set_module_config.telemetry.CopyFrom(self.moduleConfig.telemetry)
|
|
elif config_name == "canned_message":
|
|
p.set_module_config.canned_message.CopyFrom(
|
|
self.moduleConfig.canned_message
|
|
)
|
|
elif config_name == "audio":
|
|
p.set_module_config.audio.CopyFrom(self.moduleConfig.audio)
|
|
elif config_name == "remote_hardware":
|
|
p.set_module_config.remote_hardware.CopyFrom(
|
|
self.moduleConfig.remote_hardware
|
|
)
|
|
elif config_name == "neighbor_info":
|
|
p.set_module_config.neighbor_info.CopyFrom(self.moduleConfig.neighbor_info)
|
|
elif config_name == "detection_sensor":
|
|
p.set_module_config.detection_sensor.CopyFrom(self.moduleConfig.detection_sensor)
|
|
elif config_name == "ambient_lighting":
|
|
p.set_module_config.ambient_lighting.CopyFrom(self.moduleConfig.ambient_lighting)
|
|
elif config_name == "paxcounter":
|
|
p.set_module_config.paxcounter.CopyFrom(self.moduleConfig.paxcounter)
|
|
else:
|
|
our_exit(f"Error: No valid config with name {config_name}")
|
|
|
|
logger.debug(f"Wrote: {config_name}")
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def writeChannel(self, channelIndex, adminIndex=0):
|
|
"""Write the current (edited) channel to the device"""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.set_channel.CopyFrom(self.channels[channelIndex])
|
|
self._sendAdmin(p, adminIndex=adminIndex)
|
|
logger.debug(f"Wrote channel {channelIndex}")
|
|
|
|
def getChannelByChannelIndex(self, channelIndex):
|
|
"""Get channel by channelIndex
|
|
channelIndex: number, typically 0-7; based on max number channels
|
|
returns: None if there is no channel found
|
|
"""
|
|
ch = None
|
|
if self.channels and 0 <= channelIndex < len(self.channels):
|
|
ch = self.channels[channelIndex]
|
|
return ch
|
|
|
|
def deleteChannel(self, channelIndex):
|
|
"""Delete the specified channelIndex and shift other channels up"""
|
|
ch = self.channels[channelIndex]
|
|
if ch.role not in (
|
|
channel_pb2.Channel.Role.SECONDARY,
|
|
channel_pb2.Channel.Role.DISABLED,
|
|
):
|
|
our_exit("Warning: Only SECONDARY channels can be deleted")
|
|
|
|
# we are careful here because if we move the "admin" channel the channelIndex we need to use
|
|
# for sending admin channels will also change
|
|
adminIndex = self.iface.localNode._getAdminChannelIndex()
|
|
|
|
self.channels.pop(channelIndex)
|
|
self._fixupChannels() # expand back to 8 channels
|
|
|
|
index = channelIndex
|
|
while index < 8:
|
|
self.writeChannel(index, adminIndex=adminIndex)
|
|
index += 1
|
|
|
|
# if we are updating the local node, we might end up
|
|
# *moving* the admin channel index as we are writing
|
|
if (self.iface.localNode == self) and index >= adminIndex:
|
|
# We've now passed the old location for admin index
|
|
# (and written it), so we can start finding it by name again
|
|
adminIndex = 0
|
|
|
|
def getChannelByName(self, name):
|
|
"""Try to find the named channel or return None"""
|
|
for c in self.channels or []:
|
|
if c.settings and c.settings.name == name:
|
|
return c
|
|
return None
|
|
|
|
def getDisabledChannel(self):
|
|
"""Return the first channel that is disabled (i.e. available for some new use)"""
|
|
for c in self.channels:
|
|
if c.role == channel_pb2.Channel.Role.DISABLED:
|
|
return c
|
|
return None
|
|
|
|
def _getAdminChannelIndex(self):
|
|
"""Return the channel number of the admin channel, or 0 if no reserved channel"""
|
|
for c in self.channels or []:
|
|
if c.settings and c.settings.name.lower() == "admin":
|
|
return c.index
|
|
return 0
|
|
|
|
def setOwner(self, long_name: Optional[str]=None, short_name: Optional[str]=None, is_licensed: bool=False, is_unmessagable: Optional[bool]=None):
|
|
"""Set device owner name"""
|
|
logger.debug(f"in setOwner nodeNum:{self.nodeNum}")
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
|
|
nChars = 4
|
|
if long_name is not None:
|
|
long_name = long_name.strip()
|
|
# Validate that long_name is not empty or whitespace-only
|
|
if not long_name:
|
|
our_exit("ERROR: Long Name cannot be empty or contain only whitespace characters")
|
|
p.set_owner.long_name = long_name
|
|
p.set_owner.is_licensed = is_licensed
|
|
if short_name is not None:
|
|
short_name = short_name.strip()
|
|
# Validate that short_name is not empty or whitespace-only
|
|
if not short_name:
|
|
our_exit("ERROR: Short Name cannot be empty or contain only whitespace characters")
|
|
if len(short_name) > nChars:
|
|
short_name = short_name[:nChars]
|
|
print(f"Maximum is 4 characters, truncated to {short_name}")
|
|
p.set_owner.short_name = short_name
|
|
if is_unmessagable is not None:
|
|
p.set_owner.is_unmessagable = is_unmessagable
|
|
|
|
# Note: These debug lines are used in unit tests
|
|
logger.debug(f"p.set_owner.long_name:{p.set_owner.long_name}:")
|
|
logger.debug(f"p.set_owner.short_name:{p.set_owner.short_name}:")
|
|
logger.debug(f"p.set_owner.is_licensed:{p.set_owner.is_licensed}")
|
|
logger.debug(f"p.set_owner.is_unmessagable:{p.set_owner.is_unmessagable}:")
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def getURL(self, includeAll: bool = True):
|
|
"""The sharable URL that describes the current channel"""
|
|
# Only keep the primary/secondary channels, assume primary is first
|
|
channelSet = apponly_pb2.ChannelSet()
|
|
if self.channels:
|
|
for c in self.channels:
|
|
if c.role == channel_pb2.Channel.Role.PRIMARY or (
|
|
includeAll and c.role == channel_pb2.Channel.Role.SECONDARY
|
|
):
|
|
channelSet.settings.append(c.settings)
|
|
|
|
if len(self.localConfig.ListFields()) == 0:
|
|
self.requestConfig(self.localConfig.DESCRIPTOR.fields_by_name.get('lora'))
|
|
channelSet.lora_config.CopyFrom(self.localConfig.lora)
|
|
some_bytes = channelSet.SerializeToString()
|
|
s = base64.urlsafe_b64encode(some_bytes).decode("ascii")
|
|
s = s.replace("=", "").replace("+", "-").replace("/", "_")
|
|
return f"https://meshtastic.org/e/#{s}"
|
|
|
|
def setURL(self, url: str, addOnly: bool = False):
|
|
"""Set mesh network URL"""
|
|
if self.localConfig is None or self.channels is None:
|
|
our_exit("Warning: config or channels not loaded")
|
|
|
|
# URLs are of the form https://meshtastic.org/d/#{base64_channel_set}
|
|
# Split on '/#' to find the base64 encoded channel settings
|
|
if addOnly:
|
|
splitURL = url.split("/?add=true#")
|
|
else:
|
|
splitURL = url.split("/#")
|
|
if len(splitURL) == 1:
|
|
our_exit(f"Warning: Invalid URL '{url}'")
|
|
b64 = splitURL[-1]
|
|
|
|
# We normally strip padding to make for a shorter URL, but the python parser doesn't like
|
|
# that. So add back any missing padding
|
|
# per https://stackoverflow.com/a/9807138
|
|
missing_padding = len(b64) % 4
|
|
if missing_padding:
|
|
b64 += "=" * (4 - missing_padding)
|
|
|
|
decodedURL = base64.urlsafe_b64decode(b64)
|
|
channelSet = apponly_pb2.ChannelSet()
|
|
channelSet.ParseFromString(decodedURL)
|
|
|
|
if len(channelSet.settings) == 0:
|
|
our_exit("Warning: There were no settings.")
|
|
|
|
if addOnly:
|
|
# Add new channels with names not already present
|
|
# Don't change existing channels
|
|
for chs in channelSet.settings:
|
|
channelExists = self.getChannelByName(chs.name)
|
|
if channelExists or chs.name == "":
|
|
print(f"Ignoring existing or empty channel \"{chs.name}\" from add URL")
|
|
continue
|
|
ch = self.getDisabledChannel()
|
|
if not ch:
|
|
our_exit("Warning: No free channels were found")
|
|
ch.settings.CopyFrom(chs)
|
|
ch.role = channel_pb2.Channel.Role.SECONDARY
|
|
print(f"Adding new channel '{chs.name}' to device")
|
|
self.writeChannel(ch.index)
|
|
else:
|
|
i = 0
|
|
for chs in channelSet.settings:
|
|
ch = channel_pb2.Channel()
|
|
ch.role = (
|
|
channel_pb2.Channel.Role.PRIMARY
|
|
if i == 0
|
|
else channel_pb2.Channel.Role.SECONDARY
|
|
)
|
|
ch.index = i
|
|
ch.settings.CopyFrom(chs)
|
|
self.channels[ch.index] = ch
|
|
logger.debug(f"Channel i:{i} ch:{ch}")
|
|
self.writeChannel(ch.index)
|
|
i = i + 1
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
p.set_config.lora.CopyFrom(channelSet.lora_config)
|
|
self.ensureSessionKey()
|
|
self._sendAdmin(p)
|
|
|
|
def onResponseRequestRingtone(self, p):
|
|
"""Handle the response packet for requesting ringtone part 1"""
|
|
logger.debug(f"onResponseRequestRingtone() p:{p}")
|
|
errorFound = False
|
|
if "routing" in p["decoded"]:
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
errorFound = True
|
|
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
|
|
if errorFound is False:
|
|
if "decoded" in p:
|
|
if "admin" in p["decoded"]:
|
|
if "raw" in p["decoded"]["admin"]:
|
|
self.ringtonePart = p["decoded"]["admin"][
|
|
"raw"
|
|
].get_ringtone_response
|
|
logger.debug(f"self.ringtonePart:{self.ringtonePart}")
|
|
self.gotResponse = True
|
|
|
|
def get_ringtone(self):
|
|
"""Get the ringtone. Concatenate all pieces together and return a single string."""
|
|
logger.debug(f"in get_ringtone()")
|
|
if not self.module_available(mesh_pb2.EXTNOTIF_CONFIG):
|
|
logging.warning("External Notification module not present (excluded by firmware)")
|
|
return None
|
|
|
|
if not self.ringtone:
|
|
p1 = admin_pb2.AdminMessage()
|
|
p1.get_ringtone_request = True
|
|
self.gotResponse = False
|
|
self._sendAdmin(
|
|
p1, wantResponse=True, onResponse=self.onResponseRequestRingtone
|
|
)
|
|
while self.gotResponse is False:
|
|
time.sleep(0.1)
|
|
|
|
logger.debug(f"self.ringtone:{self.ringtone}")
|
|
|
|
self.ringtone = ""
|
|
if self.ringtonePart:
|
|
self.ringtone += self.ringtonePart
|
|
|
|
logger.debug(f"ringtone:{self.ringtone}")
|
|
return self.ringtone
|
|
|
|
def set_ringtone(self, ringtone):
|
|
"""Set the ringtone. The ringtone length must be less than 230 character."""
|
|
if not self.module_available(mesh_pb2.EXTNOTIF_CONFIG):
|
|
logging.warning("External Notification module not present (excluded by firmware)")
|
|
return None
|
|
|
|
if len(ringtone) > 230:
|
|
our_exit("Warning: The ringtone must be less than 230 characters.")
|
|
self.ensureSessionKey()
|
|
# split into chunks
|
|
chunks = []
|
|
chunks_size = 230
|
|
for i in range(0, len(ringtone), chunks_size):
|
|
chunks.append(ringtone[i : i + chunks_size])
|
|
|
|
# for each chunk, send a message to set the values
|
|
# for i in range(0, len(chunks)):
|
|
for i, chunk in enumerate(chunks):
|
|
p = admin_pb2.AdminMessage()
|
|
|
|
# TODO: should be a way to improve this
|
|
if i == 0:
|
|
p.set_ringtone_message = chunk
|
|
|
|
logger.debug(f"Setting ringtone '{chunk}' part {i+1}")
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def onResponseRequestCannedMessagePluginMessageMessages(self, p):
|
|
"""Handle the response packet for requesting canned message plugin message part 1"""
|
|
logger.debug(f"onResponseRequestCannedMessagePluginMessageMessages() p:{p}")
|
|
errorFound = False
|
|
if "routing" in p["decoded"]:
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
errorFound = True
|
|
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
|
|
if errorFound is False:
|
|
if "decoded" in p:
|
|
if "admin" in p["decoded"]:
|
|
if "raw" in p["decoded"]["admin"]:
|
|
self.cannedPluginMessageMessages = p["decoded"]["admin"][
|
|
"raw"
|
|
].get_canned_message_module_messages_response
|
|
logger.debug(
|
|
f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
|
|
)
|
|
self.gotResponse = True
|
|
|
|
def get_canned_message(self):
|
|
"""Get the canned message string. Concatenate all pieces together and return a single string."""
|
|
logger.debug(f"in get_canned_message()")
|
|
if not self.module_available(mesh_pb2.CANNEDMSG_CONFIG):
|
|
logging.warning("Canned Message module not present (excluded by firmware)")
|
|
return None
|
|
if not self.cannedPluginMessage:
|
|
p1 = admin_pb2.AdminMessage()
|
|
p1.get_canned_message_module_messages_request = True
|
|
self.gotResponse = False
|
|
self._sendAdmin(
|
|
p1,
|
|
wantResponse=True,
|
|
onResponse=self.onResponseRequestCannedMessagePluginMessageMessages,
|
|
)
|
|
while self.gotResponse is False:
|
|
time.sleep(0.1)
|
|
|
|
logger.debug(
|
|
f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
|
|
)
|
|
|
|
self.cannedPluginMessage = ""
|
|
if self.cannedPluginMessageMessages:
|
|
self.cannedPluginMessage += self.cannedPluginMessageMessages
|
|
|
|
logger.debug(f"canned_plugin_message:{self.cannedPluginMessage}")
|
|
return self.cannedPluginMessage
|
|
|
|
def set_canned_message(self, message):
|
|
"""Set the canned message. The canned messages length must be less than 200 character."""
|
|
if not self.module_available(mesh_pb2.CANNEDMSG_CONFIG):
|
|
logging.warning("Canned Message module not present (excluded by firmware)")
|
|
return None
|
|
|
|
if len(message) > 200:
|
|
our_exit("Warning: The canned message must be less than 200 characters.")
|
|
self.ensureSessionKey()
|
|
# split into chunks
|
|
chunks = []
|
|
chunks_size = 200
|
|
for i in range(0, len(message), chunks_size):
|
|
chunks.append(message[i : i + chunks_size])
|
|
|
|
# for each chunk, send a message to set the values
|
|
# for i in range(0, len(chunks)):
|
|
for i, chunk in enumerate(chunks):
|
|
p = admin_pb2.AdminMessage()
|
|
|
|
# TODO: should be a way to improve this
|
|
if i == 0:
|
|
p.set_canned_message_module_messages = chunk
|
|
|
|
logger.debug(f"Setting canned message '{chunk}' part {i+1}")
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def exitSimulator(self):
|
|
"""Tell a simulator node to exit (this message
|
|
is ignored for other nodes)"""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.exit_simulator = True
|
|
logger.debug("in exitSimulator()")
|
|
|
|
return self._sendAdmin(p)
|
|
|
|
def reboot(self, secs: int = 10):
|
|
"""Tell the node to reboot."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.reboot_seconds = secs
|
|
logger.info(f"Telling node to reboot in {secs} seconds")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def beginSettingsTransaction(self):
|
|
"""Tell the node to open a transaction to edit settings."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.begin_edit_settings = True
|
|
logger.info(f"Telling open a transaction to edit settings")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def commitSettingsTransaction(self):
|
|
"""Tell the node to commit the open transaction for editing settings."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.commit_edit_settings = True
|
|
logger.info(f"Telling node to commit open transaction for editing settings")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def rebootOTA(self, secs: int = 10):
|
|
"""Tell the node to reboot into factory firmware."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.reboot_ota_seconds = secs
|
|
logger.info(f"Telling node to reboot to OTA in {secs} seconds")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def enterDFUMode(self):
|
|
"""Tell the node to enter DFU mode (NRF52)."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.enter_dfu_mode_request = True
|
|
logger.info(f"Telling node to enable DFU mode")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def shutdown(self, secs: int = 10):
|
|
"""Tell the node to shutdown."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.shutdown_seconds = secs
|
|
logger.info(f"Telling node to shutdown in {secs} seconds")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def getMetadata(self):
|
|
"""Get the node's metadata."""
|
|
p = admin_pb2.AdminMessage()
|
|
p.get_device_metadata_request = True
|
|
logger.info(f"Requesting device metadata")
|
|
|
|
self._sendAdmin(
|
|
p, wantResponse=True, onResponse=self.onRequestGetMetadata
|
|
)
|
|
self.iface.waitForAckNak()
|
|
|
|
def factoryReset(self, full: bool = False):
|
|
"""Tell the node to factory reset."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
if full:
|
|
p.factory_reset_device = True
|
|
logger.info(f"Telling node to factory reset (full device reset)")
|
|
else:
|
|
p.factory_reset_config = True
|
|
logger.info(f"Telling node to factory reset (config reset)")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def removeNode(self, nodeId: Union[int, str]):
|
|
"""Tell the node to remove a specific node by ID"""
|
|
self.ensureSessionKey()
|
|
nodeId = to_node_num(nodeId)
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
p.remove_by_nodenum = nodeId
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def setFavorite(self, nodeId: Union[int, str]):
|
|
"""Tell the node to set the specified node ID to be favorited on the NodeDB on the device"""
|
|
self.ensureSessionKey()
|
|
nodeId = to_node_num(nodeId)
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
p.set_favorite_node = nodeId
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def removeFavorite(self, nodeId: Union[int, str]):
|
|
"""Tell the node to set the specified node ID to be un-favorited on the NodeDB on the device"""
|
|
self.ensureSessionKey()
|
|
nodeId = to_node_num(nodeId)
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
p.remove_favorite_node = nodeId
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def setIgnored(self, nodeId: Union[int, str]):
|
|
"""Tell the node to set the specified node ID to be ignored on the NodeDB on the device"""
|
|
self.ensureSessionKey()
|
|
nodeId = to_node_num(nodeId)
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
p.set_ignored_node = nodeId
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def removeIgnored(self, nodeId: Union[int, str]):
|
|
"""Tell the node to set the specified node ID to be un-ignored on the NodeDB on the device"""
|
|
self.ensureSessionKey()
|
|
nodeId = to_node_num(nodeId)
|
|
|
|
p = admin_pb2.AdminMessage()
|
|
p.remove_ignored_node = nodeId
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def resetNodeDb(self):
|
|
"""Tell the node to reset its list of nodes."""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.nodedb_reset = True
|
|
logger.info(f"Telling node to reset the NodeDB")
|
|
|
|
# If sending to a remote node, wait for ACK/NAK
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def setFixedPosition(self, lat: Union[int, float], lon: Union[int, float], alt: int):
|
|
"""Tell the node to set fixed position to the provided value and enable the fixed position setting"""
|
|
self.ensureSessionKey()
|
|
|
|
p = mesh_pb2.Position()
|
|
if isinstance(lat, float) and lat != 0.0:
|
|
p.latitude_i = int(lat / 1e-7)
|
|
elif isinstance(lat, int) and lat != 0:
|
|
p.latitude_i = lat
|
|
|
|
if isinstance(lon, float) and lon != 0.0:
|
|
p.longitude_i = int(lon / 1e-7)
|
|
elif isinstance(lon, int) and lon != 0:
|
|
p.longitude_i = lon
|
|
|
|
if alt != 0:
|
|
p.altitude = alt
|
|
|
|
a = admin_pb2.AdminMessage()
|
|
a.set_fixed_position.CopyFrom(p)
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(a, onResponse=onResponse)
|
|
|
|
def removeFixedPosition(self):
|
|
"""Tell the node to remove the fixed position and set the fixed position setting to false"""
|
|
self.ensureSessionKey()
|
|
p = admin_pb2.AdminMessage()
|
|
p.remove_fixed_position = True
|
|
logger.info(f"Telling node to remove fixed position")
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def setTime(self, timeSec: int = 0):
|
|
"""Tell the node to set its time to the provided timestamp, or the system's current time if not provided or 0."""
|
|
self.ensureSessionKey()
|
|
if timeSec == 0:
|
|
timeSec = int(time.time())
|
|
p = admin_pb2.AdminMessage()
|
|
p.set_time_only = timeSec
|
|
logger.info(f"Setting node time to {timeSec}")
|
|
|
|
if self == self.iface.localNode:
|
|
onResponse = None
|
|
else:
|
|
onResponse = self.onAckNak
|
|
return self._sendAdmin(p, onResponse=onResponse)
|
|
|
|
def _fixupChannels(self):
|
|
"""Fixup indexes and add disabled channels as needed"""
|
|
|
|
# Add extra disabled channels as needed
|
|
# This is needed because the protobufs will have index **missing** if the channel number is zero
|
|
for index, ch in enumerate(self.channels):
|
|
ch.index = index # fixup indexes
|
|
|
|
self._fillChannels()
|
|
|
|
def _fillChannels(self):
|
|
"""Mark unused channels as disabled"""
|
|
|
|
# Add extra disabled channels as needed
|
|
index = len(self.channels)
|
|
while index < 8:
|
|
ch = channel_pb2.Channel()
|
|
ch.role = channel_pb2.Channel.Role.DISABLED
|
|
ch.index = index
|
|
self.channels.append(ch)
|
|
index += 1
|
|
|
|
def onRequestGetMetadata(self, p):
|
|
"""Handle the response packet for requesting device metadata getMetadata()"""
|
|
logger.debug(f"onRequestGetMetadata() p:{p}")
|
|
|
|
if "routing" in p["decoded"]:
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
|
|
self.iface._acknowledgment.receivedNak = True
|
|
else:
|
|
self.iface._acknowledgment.receivedAck = True
|
|
if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
|
|
portnums_pb2.PortNum.ROUTING_APP
|
|
):
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
logger.warning(
|
|
f'Metadata request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
|
|
)
|
|
self._timeout.expireTime = time.time() # Do not wait any longer
|
|
return # Don't try to parse this routing message
|
|
logger.debug(f"Retrying metadata request.")
|
|
self.getMetadata()
|
|
return
|
|
|
|
c = p["decoded"]["admin"]["raw"].get_device_metadata_response
|
|
self._timeout.reset() # We made forward progress
|
|
logger.debug(f"Received metadata {stripnl(c)}")
|
|
print(f"\nfirmware_version: {c.firmware_version}")
|
|
print(f"device_state_version: {c.device_state_version}")
|
|
if c.role in config_pb2.Config.DeviceConfig.Role.values():
|
|
print(f"role: {config_pb2.Config.DeviceConfig.Role.Name(c.role)}")
|
|
else:
|
|
print(f"role: {c.role}")
|
|
print(f"position_flags: {self.position_flags_list(c.position_flags)}")
|
|
if c.hw_model in mesh_pb2.HardwareModel.values():
|
|
print(f"hw_model: {mesh_pb2.HardwareModel.Name(c.hw_model)}")
|
|
else:
|
|
print(f"hw_model: {c.hw_model}")
|
|
print(f"hasPKC: {c.hasPKC}")
|
|
if c.excluded_modules > 0:
|
|
print(f"excluded_modules: {self.excluded_modules_list(c.excluded_modules)}")
|
|
|
|
def onResponseRequestChannel(self, p):
|
|
"""Handle the response packet for requesting a channel _requestChannel()"""
|
|
logger.debug(f"onResponseRequestChannel() p:{p}")
|
|
|
|
if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
|
|
portnums_pb2.PortNum.ROUTING_APP
|
|
):
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
logger.warning(
|
|
f'Channel request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
|
|
)
|
|
self._timeout.expireTime = time.time() # Do not wait any longer
|
|
return # Don't try to parse this routing message
|
|
lastTried = 0
|
|
if len(self.partialChannels) > 0:
|
|
lastTried = self.partialChannels[-1].index
|
|
logger.debug(f"Retrying previous channel request.")
|
|
self._requestChannel(lastTried)
|
|
return
|
|
|
|
c = p["decoded"]["admin"]["raw"].get_channel_response
|
|
self.partialChannels.append(c)
|
|
self._timeout.reset() # We made forward progress
|
|
logger.debug(f"Received channel {stripnl(c)}")
|
|
index = c.index
|
|
|
|
if index >= 8 - 1:
|
|
logger.debug("Finished downloading channels")
|
|
|
|
self.channels = self.partialChannels
|
|
self._fixupChannels()
|
|
else:
|
|
self._requestChannel(index + 1)
|
|
|
|
def onAckNak(self, p):
|
|
"""Informative handler for ACK/NAK responses"""
|
|
if p["decoded"]["routing"]["errorReason"] != "NONE":
|
|
print(
|
|
f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}'
|
|
)
|
|
self.iface._acknowledgment.receivedNak = True
|
|
else:
|
|
if int(p["from"]) == self.iface.localNode.nodeNum:
|
|
print(
|
|
f"Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed."
|
|
)
|
|
self.iface._acknowledgment.receivedImplAck = True
|
|
else:
|
|
print(f"Received an ACK.")
|
|
self.iface._acknowledgment.receivedAck = True
|
|
|
|
def _requestChannel(self, channelNum: int):
|
|
"""Done with initial config messages, now send regular
|
|
MeshPackets to ask for settings"""
|
|
p = admin_pb2.AdminMessage()
|
|
p.get_channel_request = channelNum + 1
|
|
|
|
# Show progress message for super slow operations
|
|
if self != self.iface.localNode:
|
|
print(
|
|
f"Requesting channel {channelNum} info from remote node (this could take a while)"
|
|
)
|
|
logger.debug(
|
|
f"Requesting channel {channelNum} info from remote node (this could take a while)"
|
|
)
|
|
else:
|
|
logger.debug(f"Requesting channel {channelNum}")
|
|
|
|
return self._sendAdmin(
|
|
p, wantResponse=True, onResponse=self.onResponseRequestChannel
|
|
)
|
|
|
|
# pylint: disable=R1710
|
|
def _sendAdmin(
|
|
self,
|
|
p: admin_pb2.AdminMessage,
|
|
wantResponse: bool=True,
|
|
onResponse=None,
|
|
adminIndex: int=0,
|
|
):
|
|
"""Send an admin message to the specified node (or the local node if destNodeNum is zero)"""
|
|
|
|
if self.noProto:
|
|
logger.warning(
|
|
f"Not sending packet because protocol use is disabled by noProto"
|
|
)
|
|
else:
|
|
if (
|
|
adminIndex == 0
|
|
): # unless a special channel index was used, we want to use the admin index
|
|
adminIndex = self.iface.localNode._getAdminChannelIndex()
|
|
logger.debug(f"adminIndex:{adminIndex}")
|
|
nodeid = to_node_num(self.nodeNum)
|
|
if "adminSessionPassKey" in self.iface._getOrCreateByNum(nodeid):
|
|
p.session_passkey = self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey")
|
|
return self.iface.sendData(
|
|
p,
|
|
self.nodeNum,
|
|
portNum=portnums_pb2.PortNum.ADMIN_APP,
|
|
wantAck=True,
|
|
wantResponse=wantResponse,
|
|
onResponse=onResponse,
|
|
channelIndex=adminIndex,
|
|
pkiEncrypted=True,
|
|
)
|
|
|
|
def ensureSessionKey(self):
|
|
"""If our entry in iface.nodesByNum doesn't already have an adminSessionPassKey, make a request to get one"""
|
|
if self.noProto:
|
|
logger.warning(
|
|
f"Not ensuring session key, because protocol use is disabled by noProto"
|
|
)
|
|
else:
|
|
nodeid = to_node_num(self.nodeNum)
|
|
if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
|
|
self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)
|
|
|
|
def get_channels_with_hash(self):
|
|
"""Return a list of dicts with channel info and hash."""
|
|
result = []
|
|
if self.channels:
|
|
for c in self.channels:
|
|
if c.settings and hasattr(c.settings, "name") and hasattr(c.settings, "psk"):
|
|
hash_val = generate_channel_hash(c.settings.name, c.settings.psk)
|
|
else:
|
|
hash_val = None
|
|
result.append({
|
|
"index": c.index,
|
|
"role": channel_pb2.Channel.Role.Name(c.role),
|
|
"name": c.settings.name if c.settings and hasattr(c.settings, "name") else "",
|
|
"hash": hash_val,
|
|
})
|
|
return result
|