mirror of
https://github.com/meshtastic/python.git
synced 2025-12-30 03:17:54 -05:00
python: add QueueStatus support
This makes Python API parse QueueStatus packages returned by the firmware allowing for TX Queue status control from the phone or PC. This makes it easier to not overwhelm device with packages that are going to be ignored anyways. Signed-off-by: Pavel Boldin <pavel.b@techspark.engineering>
This commit is contained in:
committed by
Thomas Göttgens
parent
4980a02ef6
commit
cb93669740
@@ -4,6 +4,7 @@ import sys
|
||||
import random
|
||||
import time
|
||||
import logging
|
||||
import collections
|
||||
from typing import AnyStr
|
||||
import threading
|
||||
from datetime import datetime
|
||||
@@ -55,6 +56,8 @@ class MeshInterface:
|
||||
self.configId = None
|
||||
self.gotResponse = False # used in gpio read
|
||||
self.mask = None # used in gpio read and gpio watch
|
||||
self.queueStatus = None
|
||||
self.queue = collections.OrderedDict()
|
||||
|
||||
def close(self):
|
||||
"""Shutdown this interface"""
|
||||
@@ -505,13 +508,61 @@ class MeshInterface:
|
||||
m.disconnect = True
|
||||
self._sendToRadio(m)
|
||||
|
||||
def _queueHasFreeSpace(self):
|
||||
# We never got queueStatus, maybe the firmware is old
|
||||
if self.queueStatus is None:
|
||||
return True
|
||||
return self.queueStatus.free > 0
|
||||
|
||||
def _queueClaim(self):
|
||||
if self.queueStatus is None:
|
||||
return
|
||||
self.queueStatus.free -= 1
|
||||
|
||||
def _sendToRadio(self, toRadio):
|
||||
"""Send a ToRadio protobuf to the device"""
|
||||
if self.noProto:
|
||||
logging.warning(f"Not sending packet because protocol use is disabled by noProto")
|
||||
else:
|
||||
#logging.debug(f"Sending toRadio: {stripnl(toRadio)}")
|
||||
self._sendToRadioImpl(toRadio)
|
||||
|
||||
if not toRadio.HasField('packet'):
|
||||
# not a meshpacket -- send immediately, give queue a chance,
|
||||
# this makes heartbeat trigger queue
|
||||
self._sendToRadioImpl(toRadio)
|
||||
else:
|
||||
# meshpacket -- queue
|
||||
self.queue[toRadio.packet.id] = toRadio
|
||||
|
||||
resentQueue = collections.OrderedDict()
|
||||
|
||||
while self.queue:
|
||||
#logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue))
|
||||
while not self._queueHasFreeSpace():
|
||||
logging.debug("Waiting for free space in TX Queue")
|
||||
time.sleep(0.5)
|
||||
try:
|
||||
toResend = self.queue.popitem(last=False)
|
||||
except KeyError:
|
||||
break
|
||||
packetId, packet = toResend
|
||||
#logging.warn(f"packet: {packetId:08x} {packet}")
|
||||
resentQueue[packetId] = packet
|
||||
if packet is False:
|
||||
continue
|
||||
self._queueClaim()
|
||||
if packet != toRadio:
|
||||
logging.debug(f"Resending packet ID {packetId:08x} {packet}")
|
||||
self._sendToRadioImpl(packet)
|
||||
|
||||
#logging.warn("resentQueue: " + " ".join(f'{k:08x}' for k in resentQueue))
|
||||
for packetId, packet in resentQueue.items():
|
||||
if self.queue.pop(packetId, False) is False: # Packet got acked under us
|
||||
logging.debug(f"packet {packetId:08x} got acked under us")
|
||||
continue
|
||||
if packet:
|
||||
self.queue[packetId] = packet
|
||||
#logging.warn("queue + resentQueue: " + " ".join(f'{k:08x}' for k in self.queue))
|
||||
|
||||
def _sendToRadioImpl(self, toRadio):
|
||||
"""Send a ToRadio protobuf to the device"""
|
||||
@@ -524,6 +575,21 @@ class MeshInterface:
|
||||
"""
|
||||
self.localNode.requestChannels()
|
||||
|
||||
def _handleQueueStatusFromRadio(self, queueStatus):
|
||||
self.queueStatus = queueStatus
|
||||
logging.debug(f"TX QUEUE free {queueStatus.free} of {queueStatus.maxlen}, res = {queueStatus.res}, id = {queueStatus.mesh_packet_id:08x} ")
|
||||
|
||||
if queueStatus.res:
|
||||
return
|
||||
|
||||
#logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue))
|
||||
justQueued = self.queue.pop(queueStatus.mesh_packet_id, None)
|
||||
|
||||
if justQueued is None and queueStatus.mesh_packet_id != 0:
|
||||
self.queue[queueStatus.mesh_packet_id] = False
|
||||
logging.debug(f"Reply for unexpected packet ID {queueStatus.mesh_packet_id:08x}")
|
||||
#logging.warn("queue: " + " ".join(f'{k:08x}' for k in self.queue))
|
||||
|
||||
def _handleFromRadio(self, fromRadioBytes):
|
||||
"""
|
||||
Handle a packet that arrived from the radio(update model and publish events)
|
||||
@@ -580,6 +646,9 @@ class MeshInterface:
|
||||
elif fromRadio.HasField("packet"):
|
||||
self._handlePacketFromRadio(fromRadio.packet)
|
||||
|
||||
elif fromRadio.HasField('queueStatus'):
|
||||
self._handleQueueStatusFromRadio(fromRadio.queueStatus)
|
||||
|
||||
elif fromRadio.rebooted:
|
||||
# Tell clients the device went away. Careful not to call the overridden
|
||||
# subclass version that closes the serial port
|
||||
|
||||
Reference in New Issue
Block a user