"""With two radios connected serially, send and receive test messages and report back if successful. """ import logging import sys import time import traceback import io from typing import List, Optional from dotmap import DotMap # type: ignore[import-untyped] from pubsub import pub # type: ignore[import-untyped] import meshtastic.util from meshtastic import BROADCAST_NUM from meshtastic.serial_interface import SerialInterface from meshtastic.tcp_interface import TCPInterface """The interfaces we are using for our tests""" interfaces: List = [] """A list of all packets we received while the current test was running""" receivedPackets: Optional[List] = None testsRunning: bool = False testNumber: int = 0 sendingInterface = None logger = logging.getLogger(__name__) def onReceive(packet, interface) -> None: """Callback invoked when a packet arrives""" if sendingInterface == interface: pass # print("Ignoring sending interface") else: # print(f"From {interface.stream.port}: {packet}") p = DotMap(packet) if p.decoded.portnum == "TEXT_MESSAGE_APP": # We only care a about clear text packets if receivedPackets is not None: receivedPackets.append(p) def onNode(node) -> None: """Callback invoked when the node DB changes""" print(f"Node changed: {node}") def subscribe() -> None: """Subscribe to the topics the user probably wants to see, prints output to stdout""" pub.subscribe(onNode, "meshtastic.node") def testSend( fromInterface, toInterface, isBroadcast: bool=False, asBinary: bool=False, wantAck: bool=False ) -> bool: """ Sends one test packet between two nodes and then returns success or failure Arguments: fromInterface {[type]} -- [description] toInterface {[type]} -- [description] Returns: boolean -- True for success """ # pylint: disable=W0603 global receivedPackets receivedPackets = [] fromNode = fromInterface.myInfo.my_node_num if isBroadcast: toNode = BROADCAST_NUM else: toNode = toInterface.myInfo.my_node_num logger.debug(f"Sending test wantAck={wantAck} packet from {fromNode} to {toNode}") # pylint: disable=W0603 global sendingInterface sendingInterface = fromInterface if not asBinary: fromInterface.sendText(f"Test {testNumber}", toNode, wantAck=wantAck) else: fromInterface.sendData( (f"Binary {testNumber}").encode("utf-8"), toNode, wantAck=wantAck ) for _ in range(60): # max of 60 secs before we timeout time.sleep(1) if len(receivedPackets) >= 1: return True return False # Failed to send def runTests(numTests: int=50, wantAck: bool=False, maxFailures: int=0) -> bool: """Run the tests.""" logger.info(f"Running {numTests} tests with wantAck={wantAck}") numFail: int = 0 numSuccess: int = 0 for _ in range(numTests): # pylint: disable=W0603 global testNumber testNumber = testNumber + 1 isBroadcast:bool = True # asBinary=(i % 2 == 0) success = testSend( interfaces[0], interfaces[1], isBroadcast, asBinary=False, wantAck=wantAck ) if not success: numFail = numFail + 1 logger.error( f"Test {testNumber} failed, expected packet not received ({numFail} failures so far)" ) else: numSuccess = numSuccess + 1 logger.info( f"Test {testNumber} succeeded {numSuccess} successes {numFail} failures so far" ) time.sleep(1) if numFail > maxFailures: logger.error("Too many failures! Test failed!") return False return True def testThread(numTests=50) -> bool: """Test thread""" logger.info("Found devices, starting tests...") result: bool = runTests(numTests, wantAck=True) if result: # Run another test # Allow a few dropped packets result = runTests(numTests, wantAck=False, maxFailures=1) return result def onConnection(topic=pub.AUTO_TOPIC) -> None: """Callback invoked when we connect/disconnect from a radio""" print(f"Connection changed: {topic.getName()}") def openDebugLog(portName) -> io.TextIOWrapper: """Open the debug log file""" debugname = "log" + portName.replace("/", "_") logger.info(f"Writing serial debugging to {debugname}") return open(debugname, "w+", buffering=1, encoding="utf8") def testAll(numTests: int=5) -> bool: """ Run a series of tests using devices we can find. This is called from the cli with the "--test" option. """ ports: List[str] = meshtastic.util.findPorts(True) if len(ports) < 2: meshtastic.util.our_exit( "Warning: Must have at least two devices connected to USB." ) pub.subscribe(onConnection, "meshtastic.connection") pub.subscribe(onReceive, "meshtastic.receive") # pylint: disable=W0603 global interfaces interfaces = list( map( lambda port: SerialInterface( port, debugOut=openDebugLog(port), connectNow=True ), ports, ) ) logger.info("Ports opened, starting test") result: bool = testThread(numTests) for i in interfaces: i.close() return result def testSimulator() -> None: """ Assume that someone has launched meshtastic-native as a simulated node. Talk to that node over TCP, do some operations and if they are successful exit the process with a success code, else exit with a non zero exit code. Run with python3 -c 'from meshtastic.test import testSimulator; testSimulator()' """ logging.basicConfig(level=logging.DEBUG) logger.info("Connecting to simulator on localhost!") try: iface: meshtastic.tcp_interface.TCPInterface = TCPInterface("localhost") iface.showInfo() iface.localNode.showInfo() iface.localNode.exitSimulator() iface.close() logger.info("Integration test successful!") except: print("Error while testing simulator:", sys.exc_info()[0]) traceback.print_exc() sys.exit(1) sys.exit(0)