From cb936697403fc09182d345e7ac0efa09e11cc580 Mon Sep 17 00:00:00 2001 From: Pavel Boldin Date: Sat, 8 Oct 2022 16:52:27 +0300 Subject: [PATCH] python: add QueueStatus support This makes Python API parse QueueStatus packages returned by the firmware allowing for TX Queue status control from the phone or PC. This makes it easier to not overwhelm device with packages that are going to be ignored anyways. Signed-off-by: Pavel Boldin --- meshtastic/mesh_interface.py | 71 +++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/meshtastic/mesh_interface.py b/meshtastic/mesh_interface.py index 10633bd..3301c62 100644 --- a/meshtastic/mesh_interface.py +++ b/meshtastic/mesh_interface.py @@ -4,6 +4,7 @@ import sys import random import time import logging +import collections from typing import AnyStr import threading from datetime import datetime @@ -55,6 +56,8 @@ class MeshInterface: self.configId = None self.gotResponse = False # used in gpio read self.mask = None # used in gpio read and gpio watch + self.queueStatus = None + self.queue = collections.OrderedDict() def close(self): """Shutdown this interface""" @@ -505,13 +508,61 @@ class MeshInterface: m.disconnect = True self._sendToRadio(m) + def _queueHasFreeSpace(self): + # We never got queueStatus, maybe the firmware is old + if self.queueStatus is None: + return True + return self.queueStatus.free > 0 + + def _queueClaim(self): + if self.queueStatus is None: + return + self.queueStatus.free -= 1 + def _sendToRadio(self, toRadio): """Send a ToRadio protobuf to the device""" if self.noProto: logging.warning(f"Not sending packet because protocol use is disabled by noProto") else: #logging.debug(f"Sending toRadio: {stripnl(toRadio)}") - self._sendToRadioImpl(toRadio) + + if not toRadio.HasField('packet'): + # not a meshpacket -- send immediately, give queue a chance, + # this makes heartbeat trigger queue + self._sendToRadioImpl(toRadio) + else: + # meshpacket -- queue + self.queue[toRadio.packet.id] = toRadio + + resentQueue = collections.OrderedDict() + + while self.queue: + #logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue)) + while not self._queueHasFreeSpace(): + logging.debug("Waiting for free space in TX Queue") + time.sleep(0.5) + try: + toResend = self.queue.popitem(last=False) + except KeyError: + break + packetId, packet = toResend + #logging.warn(f"packet: {packetId:08x} {packet}") + resentQueue[packetId] = packet + if packet is False: + continue + self._queueClaim() + if packet != toRadio: + logging.debug(f"Resending packet ID {packetId:08x} {packet}") + self._sendToRadioImpl(packet) + + #logging.warn("resentQueue: " + " ".join(f'{k:08x}' for k in resentQueue)) + for packetId, packet in resentQueue.items(): + if self.queue.pop(packetId, False) is False: # Packet got acked under us + logging.debug(f"packet {packetId:08x} got acked under us") + continue + if packet: + self.queue[packetId] = packet + #logging.warn("queue + resentQueue: " + " ".join(f'{k:08x}' for k in self.queue)) def _sendToRadioImpl(self, toRadio): """Send a ToRadio protobuf to the device""" @@ -524,6 +575,21 @@ class MeshInterface: """ self.localNode.requestChannels() + def _handleQueueStatusFromRadio(self, queueStatus): + self.queueStatus = queueStatus + logging.debug(f"TX QUEUE free {queueStatus.free} of {queueStatus.maxlen}, res = {queueStatus.res}, id = {queueStatus.mesh_packet_id:08x} ") + + if queueStatus.res: + return + + #logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue)) + justQueued = self.queue.pop(queueStatus.mesh_packet_id, None) + + if justQueued is None and queueStatus.mesh_packet_id != 0: + self.queue[queueStatus.mesh_packet_id] = False + logging.debug(f"Reply for unexpected packet ID {queueStatus.mesh_packet_id:08x}") + #logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue)) + def _handleFromRadio(self, fromRadioBytes): """ Handle a packet that arrived from the radio(update model and publish events) @@ -580,6 +646,9 @@ class MeshInterface: elif fromRadio.HasField("packet"): self._handlePacketFromRadio(fromRadio.packet) + elif fromRadio.HasField('queueStatus'): + self._handleQueueStatusFromRadio(fromRadio.queueStatus) + elif fromRadio.rebooted: # Tell clients the device went away. Careful not to call the overridden # subclass version that closes the serial port