mirror of
https://github.com/meshtastic/python.git
synced 2025-12-24 08:27:55 -05:00
213 lines
6.2 KiB
Python
213 lines
6.2 KiB
Python
"""With two radios connected serially, send and receive test
|
|
messages and report back if successful.
|
|
"""
|
|
import logging
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import io
|
|
|
|
from typing import List, Optional
|
|
|
|
from dotmap import DotMap # type: ignore[import-untyped]
|
|
from pubsub import pub # type: ignore[import-untyped]
|
|
|
|
import meshtastic.util
|
|
from meshtastic import BROADCAST_NUM
|
|
from meshtastic.serial_interface import SerialInterface
|
|
from meshtastic.tcp_interface import TCPInterface
|
|
|
|
"""The interfaces we are using for our tests"""
|
|
interfaces: List = []
|
|
|
|
"""A list of all packets we received while the current test was running"""
|
|
receivedPackets: Optional[List] = None
|
|
|
|
testsRunning: bool = False
|
|
|
|
testNumber: int = 0
|
|
|
|
sendingInterface = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def onReceive(packet, interface) -> None:
|
|
"""Callback invoked when a packet arrives"""
|
|
if sendingInterface == interface:
|
|
pass
|
|
# print("Ignoring sending interface")
|
|
else:
|
|
# print(f"From {interface.stream.port}: {packet}")
|
|
p = DotMap(packet)
|
|
|
|
if p.decoded.portnum == "TEXT_MESSAGE_APP":
|
|
# We only care a about clear text packets
|
|
if receivedPackets is not None:
|
|
receivedPackets.append(p)
|
|
|
|
|
|
def onNode(node) -> None:
|
|
"""Callback invoked when the node DB changes"""
|
|
print(f"Node changed: {node}")
|
|
|
|
|
|
def subscribe() -> None:
|
|
"""Subscribe to the topics the user probably wants to see, prints output to stdout"""
|
|
|
|
pub.subscribe(onNode, "meshtastic.node")
|
|
|
|
|
|
def testSend(
|
|
fromInterface, toInterface, isBroadcast: bool=False, asBinary: bool=False, wantAck: bool=False
|
|
) -> bool:
|
|
"""
|
|
Sends one test packet between two nodes and then returns success or failure
|
|
|
|
Arguments:
|
|
fromInterface {[type]} -- [description]
|
|
toInterface {[type]} -- [description]
|
|
|
|
Returns:
|
|
boolean -- True for success
|
|
"""
|
|
# pylint: disable=W0603
|
|
global receivedPackets
|
|
receivedPackets = []
|
|
fromNode = fromInterface.myInfo.my_node_num
|
|
|
|
if isBroadcast:
|
|
toNode = BROADCAST_NUM
|
|
else:
|
|
toNode = toInterface.myInfo.my_node_num
|
|
|
|
logger.debug(f"Sending test wantAck={wantAck} packet from {fromNode} to {toNode}")
|
|
# pylint: disable=W0603
|
|
global sendingInterface
|
|
sendingInterface = fromInterface
|
|
if not asBinary:
|
|
fromInterface.sendText(f"Test {testNumber}", toNode, wantAck=wantAck)
|
|
else:
|
|
fromInterface.sendData(
|
|
(f"Binary {testNumber}").encode("utf-8"), toNode, wantAck=wantAck
|
|
)
|
|
for _ in range(60): # max of 60 secs before we timeout
|
|
time.sleep(1)
|
|
if len(receivedPackets) >= 1:
|
|
return True
|
|
return False # Failed to send
|
|
|
|
|
|
def runTests(numTests: int=50, wantAck: bool=False, maxFailures: int=0) -> bool:
|
|
"""Run the tests."""
|
|
logger.info(f"Running {numTests} tests with wantAck={wantAck}")
|
|
numFail: int = 0
|
|
numSuccess: int = 0
|
|
for _ in range(numTests):
|
|
# pylint: disable=W0603
|
|
global testNumber
|
|
testNumber = testNumber + 1
|
|
isBroadcast:bool = True
|
|
# asBinary=(i % 2 == 0)
|
|
success = testSend(
|
|
interfaces[0], interfaces[1], isBroadcast, asBinary=False, wantAck=wantAck
|
|
)
|
|
if not success:
|
|
numFail = numFail + 1
|
|
logger.error(
|
|
f"Test {testNumber} failed, expected packet not received ({numFail} failures so far)"
|
|
)
|
|
else:
|
|
numSuccess = numSuccess + 1
|
|
logger.info(
|
|
f"Test {testNumber} succeeded {numSuccess} successes {numFail} failures so far"
|
|
)
|
|
|
|
time.sleep(1)
|
|
|
|
if numFail > maxFailures:
|
|
logger.error("Too many failures! Test failed!")
|
|
return False
|
|
return True
|
|
|
|
|
|
def testThread(numTests=50) -> bool:
|
|
"""Test thread"""
|
|
logger.info("Found devices, starting tests...")
|
|
result: bool = runTests(numTests, wantAck=True)
|
|
if result:
|
|
# Run another test
|
|
# Allow a few dropped packets
|
|
result = runTests(numTests, wantAck=False, maxFailures=1)
|
|
return result
|
|
|
|
|
|
def onConnection(topic=pub.AUTO_TOPIC) -> None:
|
|
"""Callback invoked when we connect/disconnect from a radio"""
|
|
print(f"Connection changed: {topic.getName()}")
|
|
|
|
|
|
def openDebugLog(portName) -> io.TextIOWrapper:
|
|
"""Open the debug log file"""
|
|
debugname = "log" + portName.replace("/", "_")
|
|
logger.info(f"Writing serial debugging to {debugname}")
|
|
return open(debugname, "w+", buffering=1, encoding="utf8")
|
|
|
|
|
|
def testAll(numTests: int=5) -> bool:
|
|
"""
|
|
Run a series of tests using devices we can find.
|
|
This is called from the cli with the "--test" option.
|
|
|
|
"""
|
|
ports: List[str] = meshtastic.util.findPorts(True)
|
|
if len(ports) < 2:
|
|
meshtastic.util.our_exit(
|
|
"Warning: Must have at least two devices connected to USB."
|
|
)
|
|
|
|
pub.subscribe(onConnection, "meshtastic.connection")
|
|
pub.subscribe(onReceive, "meshtastic.receive")
|
|
# pylint: disable=W0603
|
|
global interfaces
|
|
interfaces = list(
|
|
map(
|
|
lambda port: SerialInterface(
|
|
port, debugOut=openDebugLog(port), connectNow=True
|
|
),
|
|
ports,
|
|
)
|
|
)
|
|
|
|
logger.info("Ports opened, starting test")
|
|
result: bool = testThread(numTests)
|
|
|
|
for i in interfaces:
|
|
i.close()
|
|
|
|
return result
|
|
|
|
|
|
def testSimulator() -> None:
|
|
"""
|
|
Assume that someone has launched meshtastic-native as a simulated node.
|
|
Talk to that node over TCP, do some operations and if they are successful
|
|
exit the process with a success code, else exit with a non zero exit code.
|
|
|
|
Run with
|
|
python3 -c 'from meshtastic.test import testSimulator; testSimulator()'
|
|
"""
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logger.info("Connecting to simulator on localhost!")
|
|
try:
|
|
iface: meshtastic.tcp_interface.TCPInterface = TCPInterface("localhost")
|
|
iface.showInfo()
|
|
iface.localNode.showInfo()
|
|
iface.localNode.exitSimulator()
|
|
iface.close()
|
|
logger.info("Integration test successful!")
|
|
except:
|
|
print("Error while testing simulator:", sys.exc_info()[0])
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
sys.exit(0)
|