Better support --ch-index for other commands (traceroute, telemetry, position)

This commit is contained in:
Ian McEwen
2024-04-24 17:18:17 -07:00
parent bb6e3b7a49
commit 4d10b6e1bd
2 changed files with 43 additions and 30 deletions

View File

@@ -20,6 +20,7 @@ from meshtastic import mt_config
from meshtastic import channel_pb2, config_pb2, portnums_pb2, remote_hardware, BROADCAST_ADDR
from meshtastic.version import get_active_version
from meshtastic.ble_interface import BLEInterface
from meshtastic.mesh_interface import MeshInterface
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
@@ -56,6 +57,11 @@ def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def checkChannel(interface: MeshInterface, channelIndex: int) -> bool:
"""Given an interface and channel index, return True if that channel is non-disabled on the local node"""
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logging.debug(f"ch:{ch}")
return (ch and ch.role != channel_pb2.Channel.Role.DISABLED)
def getPref(node, comp_name):
"""Get a channel or preferences value"""
@@ -403,12 +409,8 @@ def onConnected(interface):
if args.sendtext:
closeNow = True
channelIndex = 0
if args.ch_index is not None:
channelIndex = int(args.ch_index)
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logging.debug(f"ch:{ch}")
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(
f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}"
)
@@ -428,22 +430,28 @@ def onConnected(interface):
loraConfig = getattr(interface.localNode.localConfig, "lora")
hopLimit = getattr(loraConfig, "hop_limit")
dest = str(args.traceroute)
print(f"Sending traceroute request to {dest} (this could take a while)")
interface.sendTraceRoute(dest, hopLimit)
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending traceroute request to {dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendTraceRoute(dest, hopLimit, channelIndex=channelIndex)
if args.request_telemetry:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
print(f"Sending telemetry request to {args.dest} (this could take a while)")
interface.sendTelemetry(destinationId=args.dest, wantResponse=True)
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending telemetry request to {args.dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendTelemetry(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex)
if args.request_position:
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
print(f"Sending position request to {args.dest} (this could take a while)")
interface.sendPosition(destinationId=args.dest, wantResponse=True)
channelIndex = mt_config.channel_index or 0
if checkChannel(interface, channelIndex):
print(f"Sending position request to {args.dest} on channelIndex:{channelIndex} (this could take a while)")
interface.sendPosition(destinationId=args.dest, wantResponse=True, channelIndex=channelIndex)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
if args.dest == BROADCAST_ADDR:

View File

@@ -351,6 +351,7 @@ class MeshInterface:
destinationId: Union[int, str]=BROADCAST_ADDR,
wantAck: bool=False,
wantResponse: bool=False,
channelIndex: int=0,
):
"""
Send a position packet to some other node (normally a broadcast)
@@ -393,6 +394,7 @@ class MeshInterface:
wantAck=wantAck,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=channelIndex,
)
if wantResponse:
self.waitForPosition()
@@ -426,7 +428,7 @@ class MeshInterface:
if p["decoded"]["routing"]["errorReason"] == 'NO_RESPONSE':
our_exit("No response from node. At least firmware 2.1.22 is required on the destination node.")
def sendTraceRoute(self, dest: Union[int, str], hopLimit: int):
def sendTraceRoute(self, dest: Union[int, str], hopLimit: int, channelIndex: int=0):
"""Send the trace route"""
r = mesh_pb2.RouteDiscovery()
self.sendData(
@@ -435,6 +437,7 @@ class MeshInterface:
portNum=portnums_pb2.PortNum.TRACEROUTE_APP,
wantResponse=True,
onResponse=self.onResponseTraceRoute,
channelIndex=channelIndex,
)
# extend timeout based on number of nodes, limit by configured hopLimit
waitFactor = min(len(self.nodes) - 1 if self.nodes else 0, hopLimit)
@@ -456,26 +459,27 @@ class MeshInterface:
self._acknowledgment.receivedTraceRoute = True
def sendTelemetry(self, destinationId=BROADCAST_ADDR, wantResponse=False):
def sendTelemetry(self, destinationId: Union[int,str]=BROADCAST_ADDR, wantResponse: bool=False, channelIndex: int=0):
"""Send telemetry and optionally ask for a response"""
r = telemetry_pb2.Telemetry()
node = next(n for n in self.nodes.values() if n["num"] == self.localNode.nodeNum)
if node is not None:
metrics = node.get("deviceMetrics")
if metrics:
batteryLevel = metrics.get("batteryLevel")
if batteryLevel is not None:
r.device_metrics.battery_level = batteryLevel
voltage = metrics.get("voltage")
if voltage is not None:
r.device_metrics.voltage = voltage
channel_utilization = metrics.get("channelUtilization")
if channel_utilization is not None:
r.device_metrics.channel_utilization = channel_utilization
air_util_tx = metrics.get("airUtilTx")
if air_util_tx is not None:
r.device_metrics.air_util_tx = air_util_tx
if self.nodes is not None:
node = next(n for n in self.nodes.values() if n["num"] == self.localNode.nodeNum)
if node is not None:
metrics = node.get("deviceMetrics")
if metrics:
batteryLevel = metrics.get("batteryLevel")
if batteryLevel is not None:
r.device_metrics.battery_level = batteryLevel
voltage = metrics.get("voltage")
if voltage is not None:
r.device_metrics.voltage = voltage
channel_utilization = metrics.get("channelUtilization")
if channel_utilization is not None:
r.device_metrics.channel_utilization = channel_utilization
air_util_tx = metrics.get("airUtilTx")
if air_util_tx is not None:
r.device_metrics.air_util_tx = air_util_tx
if wantResponse:
onResponse = self.onResponseTelemetry
@@ -488,6 +492,7 @@ class MeshInterface:
portNum=portnums_pb2.PortNum.TELEMETRY_APP,
wantResponse=wantResponse,
onResponse=onResponse,
channelIndex=channelIndex,
)
if wantResponse:
self.waitForTelemetry()