diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 10633bd..3301c62 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -4,6 +4,7 @@ import sys import random import time import logging +import collections from typing import AnyStr import threading from datetime import datetime @@ -55,6 +56,8 @@ class MeshInterface: self.configId = None self.gotResponse = False # used in gpio read self.mask = None # used in gpio read and gpio watch + self.queueStatus = None + self.queue = collections.OrderedDict() def close(self): """Shutdown this interface""" @@ -505,13 +508,61 @@ class MeshInterface: m.disconnect = True self._sendToRadio(m) + def _queueHasFreeSpace(self): + # We never got queueStatus, maybe the firmware is old + if self.queueStatus is None: + return True + return self.queueStatus.free > 0 + + def _queueClaim(self): + if self.queueStatus is None: + return + self.queueStatus.free -= 1 + def _sendToRadio(self, toRadio): """Send a ToRadio protobuf to the device""" if self.noProto: logging.warning(f"Not sending packet because protocol use is disabled by noProto") else: #logging.debug(f"Sending toRadio: {stripnl(toRadio)}") - self._sendToRadioImpl(toRadio) + + if not toRadio.HasField('packet'): + # not a meshpacket -- send immediately, give queue a chance, + # this makes heartbeat trigger queue + self._sendToRadioImpl(toRadio) + else: + # meshpacket -- queue + self.queue[toRadio.packet.id] = toRadio + + resentQueue = collections.OrderedDict() + + while self.queue: + #logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue)) + while not self._queueHasFreeSpace(): + logging.debug("Waiting for free space in TX Queue") + time.sleep(0.5) + try: + toResend = self.queue.popitem(last=False) + except KeyError: + break + packetId, packet = toResend + #logging.warn(f"packet: {packetId:08x} {packet}") + resentQueue[packetId] = packet + if packet is False: + continue + self._queueClaim() + if packet != toRadio: + logging.debug(f"Resending packet ID {packetId:08x} {packet}") + self._sendToRadioImpl(packet) + + #logging.warn("resentQueue: " + " ".join(f'{k:08x}' for k in resentQueue)) + for packetId, packet in resentQueue.items(): + if self.queue.pop(packetId, False) is False: # Packet got acked under us + logging.debug(f"packet {packetId:08x} got acked under us") + continue + if packet: + self.queue[packetId] = packet + #logging.warn("queue + resentQueue: " + " ".join(f'{k:08x}' for k in self.queue)) def _sendToRadioImpl(self, toRadio): """Send a ToRadio protobuf to the device""" @@ -524,6 +575,21 @@ class MeshInterface: """ self.localNode.requestChannels() + def _handleQueueStatusFromRadio(self, queueStatus): + self.queueStatus = queueStatus + logging.debug(f"TX QUEUE free {queueStatus.free} of {queueStatus.maxlen}, res = {queueStatus.res}, id = {queueStatus.mesh_packet_id:08x} ") + + if queueStatus.res: + return + + #logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue)) + justQueued = self.queue.pop(queueStatus.mesh_packet_id, None) + + if justQueued is None and queueStatus.mesh_packet_id != 0: + self.queue[queueStatus.mesh_packet_id] = False + logging.debug(f"Reply for unexpected packet ID {queueStatus.mesh_packet_id:08x}") + #logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue)) + def _handleFromRadio(self, fromRadioBytes): """ Handle a packet that arrived from the radio(update model and publish events) @@ -580,6 +646,9 @@ class MeshInterface: elif fromRadio.HasField("packet"): self._handlePacketFromRadio(fromRadio.packet) + elif fromRadio.HasField('queueStatus'): + self._handleQueueStatusFromRadio(fromRadio.queueStatus) + elif fromRadio.rebooted: # Tell clients the device went away. Careful not to call the overridden # subclass version that closes the serial port