Files
python/meshtastic/test.py
2025-08-06 18:21:32 +02:00

213 lines
6.2 KiB
Python

"""With two radios connected serially, send and receive test
messages and report back if successful.
"""
import logging
import sys
import time
import traceback
import io
from typing import List, Optional
from dotmap import DotMap # type: ignore[import-untyped]
from pubsub import pub # type: ignore[import-untyped]
import meshtastic.util
from meshtastic import BROADCAST_NUM
from meshtastic.serial_interface import SerialInterface
from meshtastic.tcp_interface import TCPInterface
"""The interfaces we are using for our tests"""
interfaces: List = []
"""A list of all packets we received while the current test was running"""
receivedPackets: Optional[List] = None
testsRunning: bool = False
testNumber: int = 0
sendingInterface = None
logger = logging.getLogger(__name__)
def onReceive(packet, interface) -> None:
"""Callback invoked when a packet arrives"""
if sendingInterface == interface:
pass
# print("Ignoring sending interface")
else:
# print(f"From {interface.stream.port}: {packet}")
p = DotMap(packet)
if p.decoded.portnum == "TEXT_MESSAGE_APP":
# We only care a about clear text packets
if receivedPackets is not None:
receivedPackets.append(p)
def onNode(node) -> None:
"""Callback invoked when the node DB changes"""
print(f"Node changed: {node}")
def subscribe() -> None:
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
pub.subscribe(onNode, "meshtastic.node")
def testSend(
fromInterface, toInterface, isBroadcast: bool=False, asBinary: bool=False, wantAck: bool=False
) -> bool:
"""
Sends one test packet between two nodes and then returns success or failure
Arguments:
fromInterface {[type]} -- [description]
toInterface {[type]} -- [description]
Returns:
boolean -- True for success
"""
# pylint: disable=W0603
global receivedPackets
receivedPackets = []
fromNode = fromInterface.myInfo.my_node_num
if isBroadcast:
toNode = BROADCAST_NUM
else:
toNode = toInterface.myInfo.my_node_num
logger.debug(f"Sending test wantAck={wantAck} packet from {fromNode} to {toNode}")
# pylint: disable=W0603
global sendingInterface
sendingInterface = fromInterface
if not asBinary:
fromInterface.sendText(f"Test {testNumber}", toNode, wantAck=wantAck)
else:
fromInterface.sendData(
(f"Binary {testNumber}").encode("utf-8"), toNode, wantAck=wantAck
)
for _ in range(60): # max of 60 secs before we timeout
time.sleep(1)
if len(receivedPackets) >= 1:
return True
return False # Failed to send
def runTests(numTests: int=50, wantAck: bool=False, maxFailures: int=0) -> bool:
"""Run the tests."""
logger.info(f"Running {numTests} tests with wantAck={wantAck}")
numFail: int = 0
numSuccess: int = 0
for _ in range(numTests):
# pylint: disable=W0603
global testNumber
testNumber = testNumber + 1
isBroadcast:bool = True
# asBinary=(i % 2 == 0)
success = testSend(
interfaces[0], interfaces[1], isBroadcast, asBinary=False, wantAck=wantAck
)
if not success:
numFail = numFail + 1
logger.error(
f"Test {testNumber} failed, expected packet not received ({numFail} failures so far)"
)
else:
numSuccess = numSuccess + 1
logger.info(
f"Test {testNumber} succeeded {numSuccess} successes {numFail} failures so far"
)
time.sleep(1)
if numFail > maxFailures:
logger.error("Too many failures! Test failed!")
return False
return True
def testThread(numTests=50) -> bool:
"""Test thread"""
logger.info("Found devices, starting tests...")
result: bool = runTests(numTests, wantAck=True)
if result:
# Run another test
# Allow a few dropped packets
result = runTests(numTests, wantAck=False, maxFailures=1)
return result
def onConnection(topic=pub.AUTO_TOPIC) -> None:
"""Callback invoked when we connect/disconnect from a radio"""
print(f"Connection changed: {topic.getName()}")
def openDebugLog(portName) -> io.TextIOWrapper:
"""Open the debug log file"""
debugname = "log" + portName.replace("/", "_")
logger.info(f"Writing serial debugging to {debugname}")
return open(debugname, "w+", buffering=1, encoding="utf8")
def testAll(numTests: int=5) -> bool:
"""
Run a series of tests using devices we can find.
This is called from the cli with the "--test" option.
"""
ports: List[str] = meshtastic.util.findPorts(True)
if len(ports) < 2:
meshtastic.util.our_exit(
"Warning: Must have at least two devices connected to USB."
)
pub.subscribe(onConnection, "meshtastic.connection")
pub.subscribe(onReceive, "meshtastic.receive")
# pylint: disable=W0603
global interfaces
interfaces = list(
map(
lambda port: SerialInterface(
port, debugOut=openDebugLog(port), connectNow=True
),
ports,
)
)
logger.info("Ports opened, starting test")
result: bool = testThread(numTests)
for i in interfaces:
i.close()
return result
def testSimulator() -> None:
"""
Assume that someone has launched meshtastic-native as a simulated node.
Talk to that node over TCP, do some operations and if they are successful
exit the process with a success code, else exit with a non zero exit code.
Run with
python3 -c 'from meshtastic.test import testSimulator; testSimulator()'
"""
logging.basicConfig(level=logging.DEBUG)
logger.info("Connecting to simulator on localhost!")
try:
iface: meshtastic.tcp_interface.TCPInterface = TCPInterface("localhost")
iface.showInfo()
iface.localNode.showInfo()
iface.localNode.exitSimulator()
iface.close()
logger.info("Integration test successful!")
except:
print("Error while testing simulator:", sys.exc_info()[0])
traceback.print_exc()
sys.exit(1)
sys.exit(0)