diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index a3b11ca..fd9f36b 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -177,6 +177,7 @@ def setPref(config, comp_name, valStr): def onConnected(interface): """Callback invoked when we connect to a radio""" closeNow = False # Should we drop the connection after we finish? + waitForAckNak = False # Should we wait for an acknowledgment if we send to a remote node? try: our_globals = Globals.getInstance() args = our_globals.get_args() @@ -191,6 +192,7 @@ def onConnected(interface): alt = 0 lat = 0.0 lon = 0.0 + # TODO: use getNode(args.dest) to be able to set it for a remote node localConfig = interface.localNode.localConfig if args.setalt: alt = int(args.setalt) @@ -215,19 +217,22 @@ def onConnected(interface): if args.set_owner: closeNow = True + waitForAckNak = True print(f"Setting device owner to {args.set_owner}") - interface.getNode(args.dest).setOwner(args.set_owner) + interface.getNode(args.dest, False).setOwner(args.set_owner) if args.set_owner_short: closeNow = True + waitForAckNak = True print(f"Setting device owner short to {args.set_owner_short}") - interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short) + interface.getNode(args.dest, False).setOwner(long_name=None, short_name=args.set_owner_short) # TODO: add to export-config and configure if args.set_canned_message: closeNow = True + waitForAckNak = True print(f"Setting canned plugin message to {args.set_canned_message}") - interface.getNode(args.dest).set_canned_message(args.set_canned_message) + interface.getNode(args.dest, False).set_canned_message(args.set_canned_message) if args.pos_fields: # If --pos-fields invoked with args, set position fields @@ -271,15 +276,18 @@ def onConnected(interface): if args.reboot: closeNow = True - interface.getNode(args.dest).reboot() + waitForAckNak = True + interface.getNode(args.dest, False).reboot() if args.reboot_ota: closeNow = True - interface.getNode(args.dest).rebootOTA(); + waitForAckNak = True + interface.getNode(args.dest, False).rebootOTA(); if args.shutdown: closeNow = True - interface.getNode(args.dest).shutdown() + waitForAckNak = True + interface.getNode(args.dest, False).shutdown() if args.device_metadata: closeNow = True @@ -287,11 +295,13 @@ def onConnected(interface): if args.factory_reset: closeNow = True - interface.getNode(args.dest).factoryReset() + waitForAckNak = True + interface.getNode(args.dest, False).factoryReset() if args.reset_nodedb: closeNow = True - interface.getNode(args.dest).resetNodeDb() + waitForAckNak = True + interface.getNode(args.dest, False).resetNodeDb() if args.sendtext: closeNow = True @@ -378,15 +388,18 @@ def onConnected(interface): if 'owner' in configuration: print(f"Setting device owner to {configuration['owner']}") - interface.getNode(args.dest).setOwner(configuration['owner']) + waitForAckNak = True + interface.getNode(args.dest, False).setOwner(configuration['owner']) if 'owner_short' in configuration: print(f"Setting device owner short to {configuration['owner_short']}") - interface.getNode(args.dest).setOwner(long_name=None, short_name=configuration['owner_short']) + waitForAckNak = True + interface.getNode(args.dest, False).setOwner(long_name=None, short_name=configuration['owner_short']) if 'ownerShort' in configuration: print(f"Setting device owner short to {configuration['ownerShort']}") - interface.getNode(args.dest).setOwner(long_name=None, short_name=configuration['ownerShort']) + waitForAckNak = True + interface.getNode(args.dest, False).setOwner(long_name=None, short_name=configuration['ownerShort']) if 'channel_url' in configuration: print("Setting channel url to", configuration['channel_url']) @@ -604,6 +617,10 @@ def onConnected(interface): else: tunnel.Tunnel(interface, subnet=args.tunnel_net) + if args.dest != BROADCAST_ADDR and waitForAckNak: + print(f"Waiting for an acknowledgment from remote node (this could take a while)") + interface.getNode(args.dest, False).iface.waitForAckNak() + # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation if (not args.seriallog) and closeNow: interface.close() # after running command then exit diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 96d170b..daf8862 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -18,7 +18,7 @@ from google.protobuf.json_format import MessageToJson import meshtastic.node from meshtastic import portnums_pb2, mesh_pb2 -from meshtastic.util import stripnl, Timeout, our_exit, remove_keys_from_dict, convert_mac_addr +from meshtastic.util import stripnl, Timeout, Acknowledgment, our_exit, remove_keys_from_dict, convert_mac_addr from meshtastic.__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols class MeshInterface: @@ -47,6 +47,7 @@ class MeshInterface: self.responseHandlers = {} # A map from request ID to the handler self.failure = None # If we've encountered a fatal exception it will be kept here self._timeout = Timeout() + self._acknowledgment = Acknowledgment() self.heartbeatTimer = None random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it self.currentPacketId = random.randint(0, 0xffffffff) @@ -157,16 +158,18 @@ class MeshInterface: return table - def getNode(self, nodeId): + def getNode(self, nodeId, requestConfig=True): """Return a node object which contains device settings and channel info""" if nodeId in (LOCAL_ADDR, BROADCAST_ADDR): return self.localNode else: n = meshtastic.node.Node(self, nodeId) - logging.debug("About to requestConfig") - n.requestConfig() - if not n.waitForConfig(): - our_exit("Error: Timed out waiting for node config") + # Only request device settings and channel info when necessary + if requestConfig: + logging.debug("About to requestConfig") + n.requestConfig() + if not n.waitForConfig(): + our_exit("Error: Timed out waiting for node config") return n def sendText(self, text: AnyStr, @@ -366,6 +369,11 @@ class MeshInterface: if not success: raise Exception("Timed out waiting for interface config") + def waitForAckNak(self): + success = self._timeout.waitForAckNak(self._acknowledgment) + if not success: + raise Exception("Timed out waiting for an acknowledgment") + def getMyNodeInfo(self): """Get info about my node.""" if self.myInfo is None: @@ -717,7 +725,8 @@ class MeshInterface: # we keep the responseHandler in dict until we get a non ack handler = self.responseHandlers.pop(requestId, None) if handler is not None: - handler.callback(asDict) + if not isAck or (isAck and handler.__name__ == "onAckNak"): + handler.callback(asDict) logging.debug(f"Publishing {topic}: packet={stripnl(asDict)} ") publishingThread.queueWork(lambda: pub.sendMessage( diff --git a/meshtastic/node.py b/meshtastic/node.py index 8ba8cb2..a27c3e6 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -330,7 +330,12 @@ class Node: logging.debug(f'p.set_owner.long_name:{p.set_owner.long_name}:') logging.debug(f'p.set_owner.short_name:{p.set_owner.short_name}:') logging.debug(f'p.set_owner.is_licensed:{p.set_owner.is_licensed}') - return self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def getURL(self, includeAll: bool = True): """The sharable URL that describes the current channel""" @@ -448,7 +453,12 @@ class Node: p.set_canned_message_module_messages = chunk logging.debug(f"Setting canned message '{chunk}' part {i+1}") - self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def exitSimulator(self): """Tell a simulator node to exit (this message @@ -465,7 +475,12 @@ class Node: p.reboot_seconds = secs logging.info(f"Telling node to reboot in {secs} seconds") - return self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def rebootOTA(self, secs: int = 10): """Tell the node to reboot into factory firmware.""" @@ -473,7 +488,12 @@ class Node: p.reboot_ota_seconds = secs logging.info(f"Telling node to reboot to OTA in {secs} seconds") - return self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def shutdown(self, secs: int = 10): """Tell the node to shutdown.""" @@ -481,7 +501,12 @@ class Node: p.shutdown_seconds = secs logging.info(f"Telling node to shutdown in {secs} seconds") - return self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def getMetadata(self, secs: int = 10): """Tell the node to shutdown.""" @@ -497,7 +522,12 @@ class Node: p.factory_reset = True logging.info(f"Telling node to factory reset") - return self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def resetNodeDb(self): """Tell the node to reset its list of nodes.""" @@ -505,7 +535,12 @@ class Node: p.nodedb_reset = True logging.info(f"Telling node to reset the NodeDB") - return self._sendAdmin(p) + # If sending to a remote node, wait for ACK/NAK + if self == self.iface.localNode: + onResponse = None + else: + onResponse = self.onAckNak + return self._sendAdmin(p, onResponse=onResponse) def _fixupChannels(self): """Fixup indexes and add disabled channels as needed""" @@ -589,6 +624,18 @@ class Node: else: self._requestChannel(index + 1) + def onAckNak(self, p): + if p["decoded"]["routing"]["errorReason"] != "NONE": + print(f'Received a NAK, error reason: {p["decoded"]["routing"]["errorReason"]}') + self.iface._acknowledgment.receivedNak = True + else: + if int(p["from"]) == self.iface.localNode.nodeNum: + print(f'Received an implicit ACK. Packet will likely arrive, but cannot be guaranteed.') + self.iface._acknowledgment.receivedImplAck = True + else: + print(f'Received an ACK.') + self.iface._acknowledgment.receivedAck = True + def _requestChannel(self, channelNum: int): """Done with initial config messages, now send regular MeshPackets to ask for settings""" diff --git a/meshtastic/util.py b/meshtastic/util.py index 9ab9c80..f83a4f2 100644 --- a/meshtastic/util.py +++ b/meshtastic/util.py @@ -159,6 +159,27 @@ class Timeout: time.sleep(self.sleepInterval) return False + def waitForAckNak(self, acknowledgment, attrs=('receivedAck', 'receivedNak', 'receivedImplAck')): + """Block until an ACK or NAK has been received. Returns True if ACK or NAK has been received.""" + self.reset() + while time.time() < self.expireTime: + if any(map(lambda a: getattr(acknowledgment, a, None), attrs)): + acknowledgment.reset() + return True + time.sleep(self.sleepInterval) + return False + +class Acknowledgment: + "A class that records which type of acknowledgment was just received, if any." + def __init__(self): + self.receivedAck = False + self.receivedNak = False + self.receivedImplAck = False + + def reset(self): + self.receivedAck = False + self.receivedNak = False + self.receivedImplAck = False class DeferredExecution(): """A thread that accepts closures to run, and runs them as they are received"""