establish trunk format

This commit is contained in:
Thomas Göttgens
2023-03-31 13:12:30 +02:00
parent 78f85efe56
commit 7e6f13f0a2
62 changed files with 3856 additions and 2664 deletions

View File

@@ -1,12 +1,21 @@
"""Node class
"""
import logging
import base64
import logging
import time
from google.protobuf.json_format import MessageToJson
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2, localonly_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK, camel_to_snake
from meshtastic import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, portnums_pb2
from meshtastic.util import (
Timeout,
camel_to_snake,
fromPSK,
our_exit,
pskToString,
stripnl,
)
class Node:
@@ -36,13 +45,15 @@ class Node:
"""Show human readable description of our channels."""
print("Channels:")
if self.channels:
logging.debug(f'self.channels:{self.channels}')
logging.debug(f"self.channels:{self.channels}")
for c in self.channels:
#print('c.settings.psk:', c.settings.psk)
# print('c.settings.psk:', c.settings.psk)
cStr = stripnl(MessageToJson(c.settings))
# only show if there is no psk (meaning disabled channel)
if c.settings.psk:
print(f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}")
print(
f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}"
)
publicURL = self.getURL(includeAll=False)
adminURL = self.getURL(includeAll=True)
print(f"\nPrimary channel URL: {publicURL}")
@@ -68,10 +79,10 @@ class Node:
self.partialChannels = [] # We keep our channels in a temp array until finished
self._requestChannel(0)
def onResponseRequestSettings(self, p):
"""Handle the response packets for requesting settings _requestSettings()"""
logging.debug(f'onResponseRequestSetting() p:{p}')
logging.debug(f"onResponseRequestSetting() p:{p}")
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
@@ -83,15 +94,21 @@ class Node:
if "getConfigResponse" in adminMessage:
resp = adminMessage["getConfigResponse"]
field = list(resp.keys())[0]
config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(camel_to_snake(field))
config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(
camel_to_snake(field)
)
config_values = getattr(self.localConfig, config_type.name)
elif "getModuleConfigResponse" in adminMessage:
resp = adminMessage["getModuleConfigResponse"]
field = list(resp.keys())[0]
config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(camel_to_snake(field))
config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(
camel_to_snake(field)
)
config_values = getattr(self.moduleConfig, config_type.name)
else:
print("Did not receive a valid response. Make sure to have a shared channel named 'admin'.")
else:
print(
"Did not receive a valid response. Make sure to have a shared channel named 'admin'."
)
return
for key, value in resp[field].items():
setattr(config_values, camel_to_snake(key), value)
@@ -100,7 +117,7 @@ class Node:
def requestConfig(self, configType):
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onResponseRequestSettings
print("Requesting current config from remote node (this can take a while).")
@@ -109,7 +126,7 @@ class Node:
p = admin_pb2.AdminMessage()
p.get_config_request = msgIndex
self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
else:
else:
p = admin_pb2.AdminMessage()
p.get_module_config_request = msgIndex
self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
@@ -122,9 +139,9 @@ class Node:
print("Writing modified channels to device")
self.writeChannel(0)
def waitForConfig(self, attribute='channels'):
def waitForConfig(self, attribute="channels"):
"""Block until radio config is received. Returns True if config has been received."""
return self._timeout.waitForSet(self, attrs=('localConfig', attribute))
return self._timeout.waitForSet(self, attrs=("localConfig", attribute))
def writeConfig(self):
"""Write the current (edited) localConfig to the device"""
@@ -196,18 +213,20 @@ class Node:
if self.moduleConfig.external_notification:
p = admin_pb2.AdminMessage()
p.set_module_config.external_notification.CopyFrom(self.moduleConfig.external_notification)
p.set_module_config.external_notification.CopyFrom(
self.moduleConfig.external_notification
)
self._sendAdmin(p)
logging.debug("Wrote module: external_notification")
time.sleep(0.3)
if self.moduleConfig.store_forward:
p = admin_pb2.AdminMessage()
p.set_module_config.store_forward.CopyFrom(self.moduleConfig.store_forward)
self._sendAdmin(p)
logging.debug("Wrote module: store_forward")
time.sleep(0.3)
if self.moduleConfig.range_test:
p = admin_pb2.AdminMessage()
p.set_module_config.range_test.CopyFrom(self.moduleConfig.range_test)
@@ -224,7 +243,9 @@ class Node:
if self.moduleConfig.canned_message:
p = admin_pb2.AdminMessage()
p.set_module_config.canned_message.CopyFrom(self.moduleConfig.canned_message)
p.set_module_config.canned_message.CopyFrom(
self.moduleConfig.canned_message
)
self._sendAdmin(p)
logging.debug("Wrote module: canned_message")
time.sleep(0.3)
@@ -238,7 +259,9 @@ class Node:
if self.moduleConfig.remote_hardware:
p = admin_pb2.AdminMessage()
p.set_module_config.remote_hardware.CopyFrom(self.moduleConfig.remote_hardware)
p.set_module_config.remote_hardware.CopyFrom(
self.moduleConfig.remote_hardware
)
self._sendAdmin(p)
logging.debug("Wrote module: remote_hardware")
time.sleep(0.3)
@@ -250,45 +273,51 @@ class Node:
p = admin_pb2.AdminMessage()
if config_name == 'device':
if config_name == "device":
p.set_config.device.CopyFrom(self.localConfig.device)
elif config_name == 'position':
elif config_name == "position":
p.set_config.position.CopyFrom(self.localConfig.position)
elif config_name == 'power':
elif config_name == "power":
p.set_config.power.CopyFrom(self.localConfig.power)
elif config_name == 'network':
elif config_name == "network":
p.set_config.network.CopyFrom(self.localConfig.network)
elif config_name == 'display':
elif config_name == "display":
p.set_config.display.CopyFrom(self.localConfig.display)
elif config_name == 'lora':
elif config_name == "lora":
p.set_config.lora.CopyFrom(self.localConfig.lora)
elif config_name == 'bluetooth':
elif config_name == "bluetooth":
p.set_config.bluetooth.CopyFrom(self.localConfig.bluetooth)
elif config_name == 'mqtt':
elif config_name == "mqtt":
p.set_module_config.mqtt.CopyFrom(self.moduleConfig.mqtt)
elif config_name == 'serial':
elif config_name == "serial":
p.set_module_config.serial.CopyFrom(self.moduleConfig.serial)
elif config_name == 'external_notification':
p.set_module_config.external_notification.CopyFrom(self.moduleConfig.external_notification)
elif config_name == 'store_forward':
elif config_name == "external_notification":
p.set_module_config.external_notification.CopyFrom(
self.moduleConfig.external_notification
)
elif config_name == "store_forward":
p.set_module_config.store_forward.CopyFrom(self.moduleConfig.store_forward)
elif config_name == 'range_test':
elif config_name == "range_test":
p.set_module_config.range_test.CopyFrom(self.moduleConfig.range_test)
elif config_name == 'telemetry':
elif config_name == "telemetry":
p.set_module_config.telemetry.CopyFrom(self.moduleConfig.telemetry)
elif config_name == 'canned_message':
p.set_module_config.canned_message.CopyFrom(self.moduleConfig.canned_message)
elif config_name == 'audio':
elif config_name == "canned_message":
p.set_module_config.canned_message.CopyFrom(
self.moduleConfig.canned_message
)
elif config_name == "audio":
p.set_module_config.audio.CopyFrom(self.moduleConfig.audio)
elif config_name == 'remote_hardware':
p.set_module_config.remote_hardware.CopyFrom(self.moduleConfig.remote_hardware)
elif config_name == "remote_hardware":
p.set_module_config.remote_hardware.CopyFrom(
self.moduleConfig.remote_hardware
)
else:
our_exit(f"Error: No valid config with name {config_name}")
logging.debug(f"Wrote: {config_name}")
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
self._sendAdmin(p, onResponse=onResponse)
@@ -302,8 +331,8 @@ class Node:
def getChannelByChannelIndex(self, channelIndex):
"""Get channel by channelIndex
channelIndex: number, typically 0-7; based on max number channels
returns: None if there is no channel found
channelIndex: number, typically 0-7; based on max number channels
returns: None if there is no channel found
"""
ch = None
if self.channels and 0 <= channelIndex < len(self.channels):
@@ -313,7 +342,10 @@ class Node:
def deleteChannel(self, channelIndex):
"""Delete the specifed channelIndex and shift other channels up"""
ch = self.channels[channelIndex]
if ch.role not in (channel_pb2.Channel.Role.SECONDARY, channel_pb2.Channel.Role.DISABLED):
if ch.role not in (
channel_pb2.Channel.Role.SECONDARY,
channel_pb2.Channel.Role.DISABLED,
):
our_exit("Warning: Only SECONDARY channels can be deleted")
# we are careful here because if we move the "admin" channel the channelIndex we need to use
@@ -337,7 +369,7 @@ class Node:
def getChannelByName(self, name):
"""Try to find the named channel or return None"""
for c in (self.channels or []):
for c in self.channels or []:
if c.settings and c.settings.name == name:
return c
return None
@@ -375,13 +407,13 @@ class Node:
p.set_owner.short_name = short_name
# Note: These debug lines are used in unit tests
logging.debug(f'p.set_owner.long_name:{p.set_owner.long_name}:')
logging.debug(f'p.set_owner.short_name:{p.set_owner.short_name}:')
logging.debug(f'p.set_owner.is_licensed:{p.set_owner.is_licensed}')
logging.debug(f"p.set_owner.long_name:{p.set_owner.long_name}:")
logging.debug(f"p.set_owner.short_name:{p.set_owner.short_name}:")
logging.debug(f"p.set_owner.is_licensed:{p.set_owner.is_licensed}")
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
@@ -391,12 +423,14 @@ class Node:
channelSet = apponly_pb2.ChannelSet()
if self.channels:
for c in self.channels:
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
if c.role == channel_pb2.Channel.Role.PRIMARY or (
includeAll and c.role == channel_pb2.Channel.Role.SECONDARY
):
channelSet.settings.append(c.settings)
channelSet.lora_config.CopyFrom(self.localConfig.lora)
some_bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(some_bytes).decode('ascii')
s = base64.urlsafe_b64encode(some_bytes).decode("ascii")
s = s.replace("=", "").replace("+", "-").replace("/", "_")
return f"https://meshtastic.org/e/#{s}"
@@ -415,24 +449,27 @@ class Node:
# per https://stackoverflow.com/a/9807138
missing_padding = len(b64) % 4
if missing_padding:
b64 += '=' * (4 - missing_padding)
b64 += "=" * (4 - missing_padding)
decodedURL = base64.urlsafe_b64decode(b64)
channelSet = apponly_pb2.ChannelSet()
channelSet.ParseFromString(decodedURL)
if len(channelSet.settings) == 0:
our_exit("Warning: There were no settings.")
i = 0
for chs in channelSet.settings:
ch = channel_pb2.Channel()
ch.role = channel_pb2.Channel.Role.PRIMARY if i == 0 else channel_pb2.Channel.Role.SECONDARY
ch.role = (
channel_pb2.Channel.Role.PRIMARY
if i == 0
else channel_pb2.Channel.Role.SECONDARY
)
ch.index = i
ch.settings.CopyFrom(chs)
self.channels[ch.index] = ch
logging.debug(f'Channel i:{i} ch:{ch}')
logging.debug(f"Channel i:{i} ch:{ch}")
self.writeChannel(ch.index)
i = i + 1
@@ -442,7 +479,7 @@ class Node:
def onResponseRequestRingtone(self, p):
"""Handle the response packet for requesting ringtone part 1"""
logging.debug(f'onResponseRequestRingtone() p:{p}')
logging.debug(f"onResponseRequestRingtone() p:{p}")
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
@@ -452,30 +489,33 @@ class Node:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.ringtonePart = p["decoded"]["admin"]["raw"].get_ringtone_response
logging.debug(f'self.ringtonePart:{self.ringtonePart}')
self.ringtonePart = p["decoded"]["admin"][
"raw"
].get_ringtone_response
logging.debug(f"self.ringtonePart:{self.ringtonePart}")
self.gotResponse = True
def get_ringtone(self):
"""Get the ringtone. Concatenate all pieces together and return a single string."""
logging.debug(f'in get_ringtone()')
logging.debug(f"in get_ringtone()")
if not self.ringtone:
p1 = admin_pb2.AdminMessage()
p1.get_ringtone_request = True
self.gotResponse = False
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestRingtone)
self._sendAdmin(
p1, wantResponse=True, onResponse=self.onResponseRequestRingtone
)
while self.gotResponse is False:
time.sleep(0.1)
logging.debug(f'self.ringtone:{self.ringtone}')
logging.debug(f"self.ringtone:{self.ringtone}")
self.ringtone = ""
if self.ringtonePart:
self.ringtone += self.ringtonePart
print(f'ringtone:{self.ringtone}')
logging.debug(f'ringtone:{self.ringtone}')
print(f"ringtone:{self.ringtone}")
logging.debug(f"ringtone:{self.ringtone}")
return self.ringtone
def set_ringtone(self, ringtone):
@@ -488,10 +528,10 @@ class Node:
chunks = []
chunks_size = 230
for i in range(0, len(ringtone), chunks_size):
chunks.append(ringtone[i: i + chunks_size])
chunks.append(ringtone[i : i + chunks_size])
# for each chunk, send a message to set the values
#for i in range(0, len(chunks)):
# for i in range(0, len(chunks)):
for i, chunk in enumerate(chunks):
p = admin_pb2.AdminMessage()
@@ -503,13 +543,13 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
def onResponseRequestCannedMessagePluginMessageMessages(self, p):
"""Handle the response packet for requesting canned message plugin message part 1"""
logging.debug(f'onResponseRequestCannedMessagePluginMessageMessages() p:{p}')
logging.debug(f"onResponseRequestCannedMessagePluginMessageMessages() p:{p}")
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
@@ -519,31 +559,39 @@ class Node:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessageMessages = p["decoded"]["admin"]["raw"].get_canned_message_module_messages_response
logging.debug(f'self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}')
self.cannedPluginMessageMessages = p["decoded"]["admin"][
"raw"
].get_canned_message_module_messages_response
logging.debug(
f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
)
self.gotResponse = True
def get_canned_message(self):
"""Get the canned message string. Concatenate all pieces together and return a single string."""
logging.debug(f'in get_canned_message()')
logging.debug(f"in get_canned_message()")
if not self.cannedPluginMessage:
p1 = admin_pb2.AdminMessage()
p1.get_canned_message_module_messages_request = True
self.gotResponse = False
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessageMessages)
self._sendAdmin(
p1,
wantResponse=True,
onResponse=self.onResponseRequestCannedMessagePluginMessageMessages,
)
while self.gotResponse is False:
time.sleep(0.1)
logging.debug(f'self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}')
logging.debug(
f"self.cannedPluginMessageMessages:{self.cannedPluginMessageMessages}"
)
self.cannedPluginMessage = ""
if self.cannedPluginMessageMessages:
self.cannedPluginMessage += self.cannedPluginMessageMessages
print(f'canned_plugin_message:{self.cannedPluginMessage}')
logging.debug(f'canned_plugin_message:{self.cannedPluginMessage}')
print(f"canned_plugin_message:{self.cannedPluginMessage}")
logging.debug(f"canned_plugin_message:{self.cannedPluginMessage}")
return self.cannedPluginMessage
def set_canned_message(self, message):
@@ -556,10 +604,10 @@ class Node:
chunks = []
chunks_size = 200
for i in range(0, len(message), chunks_size):
chunks.append(message[i: i + chunks_size])
chunks.append(message[i : i + chunks_size])
# for each chunk, send a message to set the values
#for i in range(0, len(chunks)):
# for i in range(0, len(chunks)):
for i, chunk in enumerate(chunks):
p = admin_pb2.AdminMessage()
@@ -571,16 +619,16 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
def exitSimulator(self):
"""Tell a simulator node to exit (this message
is ignored for other nodes)"""
is ignored for other nodes)"""
p = admin_pb2.AdminMessage()
p.exit_simulator = True
logging.debug('in exitSimulator()')
logging.debug("in exitSimulator()")
return self._sendAdmin(p)
@@ -593,10 +641,10 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
def beginSettingsTransaction(self):
"""Tell the node to open a transaction to edit settings."""
p = admin_pb2.AdminMessage()
@@ -606,7 +654,7 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
@@ -619,7 +667,7 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
@@ -632,9 +680,9 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
return self._sendAdmin(p, onResponse=onResponse)
def shutdown(self, secs: int = 10):
"""Tell the node to shutdown."""
@@ -645,7 +693,7 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
@@ -655,7 +703,9 @@ class Node:
p.get_device_metadata_request = True
logging.info(f"Requesting device metadata")
return self._sendAdmin(p, wantResponse=True, onResponse=self.onRequestGetMetadata)
return self._sendAdmin(
p, wantResponse=True, onResponse=self.onRequestGetMetadata
)
def factoryReset(self):
"""Tell the node to factory reset."""
@@ -666,9 +716,9 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
return self._sendAdmin(p, onResponse=onResponse)
def resetNodeDb(self):
"""Tell the node to reset its list of nodes."""
@@ -679,7 +729,7 @@ class Node:
# If sending to a remote node, wait for ACK/NAK
if self == self.iface.localNode:
onResponse = None
else:
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
@@ -705,20 +755,23 @@ class Node:
self.channels.append(ch)
index += 1
def onRequestGetMetadata(self, p):
"""Handle the response packet for requesting device metadata getMetadata()"""
logging.debug(f'onRequestGetMetadata() p:{p}')
logging.debug(f"onRequestGetMetadata() p:{p}")
if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.ROUTING_APP):
if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.ROUTING_APP
):
if p["decoded"]["routing"]["errorReason"] != "NONE":
logging.warning(f'Metadata request failed, error reason: {p["decoded"]["routing"]["errorReason"]}')
self._timeout.expireTime = time.time() # Do not wait any longer
return # Don't try to parse this routing message
logging.warning(
f'Metadata request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
)
self._timeout.expireTime = time.time() # Do not wait any longer
return # Don't try to parse this routing message
logging.debug(f"Retrying metadata request.")
self.getMetadata()
return
return
c = p["decoded"]["admin"]["raw"].get_device_metadata_response
self._timeout.reset() # We made foreward progress
logging.debug(f"Received metadata {stripnl(c)}")
@@ -727,13 +780,17 @@ class Node:
def onResponseRequestChannel(self, p):
"""Handle the response packet for requesting a channel _requestChannel()"""
logging.debug(f'onResponseRequestChannel() p:{p}')
logging.debug(f"onResponseRequestChannel() p:{p}")
if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(portnums_pb2.PortNum.ROUTING_APP):
if p["decoded"]["portnum"] == portnums_pb2.PortNum.Name(
portnums_pb2.PortNum.ROUTING_APP
):
if p["decoded"]["routing"]["errorReason"] != "NONE":
logging.warning(f'Channel request failed, error reason: {p["decoded"]["routing"]["errorReason"]}')
self._timeout.expireTime = time.time() # Do not wait any longer
return # Don't try to parse this routing message
logging.warning(
f'Channel request failed, error reason: {p["decoded"]["routing"]["errorReason"]}'
)
self._timeout.expireTime = time.time() # Do not wait any longer
return # Don't try to parse this routing message
lastTried = 0
if len(self.partialChannels) > 0:
lastTried = self.partialChannels[-1].index
@@ -752,7 +809,9 @@ class Node:
# Once we see a response that has NO settings, assume
# we are at the end of channels and stop fetching
quitEarly = (c.role == channel_pb2.Channel.Role.DISABLED) and fastChannelDownload
quitEarly = (
c.role == channel_pb2.Channel.Role.DISABLED
) and fastChannelDownload
if quitEarly or index >= self.iface.myInfo.max_channels - 1:
logging.debug("Finished downloading channels")
@@ -767,47 +826,68 @@ class Node:
def onAckNak(self, p):
if p["decoded"]["routing"]["errorReason"] != "NONE":
print(f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}')
print(
f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}'
)
self.iface._acknowledgment.receivedNak = True
else:
else:
if int(p["from"]) == self.iface.localNode.nodeNum:
print(f'Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed.')
self.iface._acknowledgment.receivedImplAck = True
else:
print(f'Received an ACK.')
self.iface._acknowledgment.receivedAck = True
print(
f"Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed."
)
self.iface._acknowledgment.receivedImplAck = True
else:
print(f"Received an ACK.")
self.iface._acknowledgment.receivedAck = True
def _requestChannel(self, channelNum: int):
"""Done with initial config messages, now send regular
MeshPackets to ask for settings"""
MeshPackets to ask for settings"""
p = admin_pb2.AdminMessage()
p.get_channel_request = channelNum + 1
# Show progress message for super slow operations
if self != self.iface.localNode:
print(f"Requesting channel {channelNum} info from remote node (this could take a while)")
logging.debug(f"Requesting channel {channelNum} info from remote node (this could take a while)")
print(
f"Requesting channel {channelNum} info from remote node (this could take a while)"
)
logging.debug(
f"Requesting channel {channelNum} info from remote node (this could take a while)"
)
else:
logging.debug(f"Requesting channel {channelNum}")
return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestChannel)
return self._sendAdmin(
p, wantResponse=True, onResponse=self.onResponseRequestChannel
)
# pylint: disable=R1710
def _sendAdmin(self, p: admin_pb2.AdminMessage, wantResponse=True,
onResponse=None, adminIndex=0):
def _sendAdmin(
self,
p: admin_pb2.AdminMessage,
wantResponse=True,
onResponse=None,
adminIndex=0,
):
"""Send an admin message to the specified node (or the local node if destNodeNum is zero)"""
if self.noProto:
logging.warning(f"Not sending packet because protocol use is disabled by noProto")
logging.warning(
f"Not sending packet because protocol use is disabled by noProto"
)
else:
if adminIndex == 0: # unless a special channel index was used, we want to use the admin index
if (
adminIndex == 0
): # unless a special channel index was used, we want to use the admin index
adminIndex = self.iface.localNode._getAdminChannelIndex()
logging.debug(f'adminIndex:{adminIndex}')
return self.iface.sendData(p, self.nodeNum,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=False,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=adminIndex)
logging.debug(f"adminIndex:{adminIndex}")
return self.iface.sendData(
p,
self.nodeNum,
portNum=portnums_pb2.PortNum.ADMIN_APP,
wantAck=False,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=adminIndex,
)