mirror of
https://github.com/meshtastic/python.git
synced 2026-04-17 21:42:20 -04:00
if a test fails exit the entire framework
This commit is contained in:
@@ -192,6 +192,12 @@ class MeshInterface:
|
||||
pub.sendMessage("meshtastic.receive.user",
|
||||
packet=asObj)
|
||||
if meshPacket.payload.HasField("data"):
|
||||
|
||||
# For text messages, we go ahead and decode the text to ascii for our users
|
||||
# if asObj.payload.data.typ == "CLEAR_TEXT":
|
||||
# asObj.payload.data.text = asObj.payload.data.payload.decode(
|
||||
# "utf-8")
|
||||
|
||||
pub.sendMessage("meshtastic.receive.data",
|
||||
packet=asObj)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ from . import util
|
||||
from . import StreamInterface
|
||||
from pubsub import pub
|
||||
import time
|
||||
import sys
|
||||
import threading
|
||||
|
||||
"""The interfaces we are using for our tests"""
|
||||
@@ -21,7 +22,7 @@ def onReceive(packet):
|
||||
print(f"Received: {packet}")
|
||||
if packet.payload.data.typ == "CLEAR_TEXT":
|
||||
# We only care a about clear text packets
|
||||
receivedPackets.extend(packet)
|
||||
receivedPackets.append(packet)
|
||||
|
||||
|
||||
def onNode(node):
|
||||
@@ -50,24 +51,38 @@ def testSend(fromInterface, toInterface):
|
||||
receivedPackets = []
|
||||
fromNode = fromInterface.myInfo.my_node_num
|
||||
toNode = toInterface.myInfo.my_node_num
|
||||
|
||||
# FIXME, hack to test broadcast
|
||||
toNode = 255
|
||||
|
||||
logging.info(f"Sending test packet from {fromNode} to {toNode}")
|
||||
fromInterface.sendText(f"Test {testNumber}", toNode)
|
||||
time.sleep(10)
|
||||
if (len(receivedPackets) < 1):
|
||||
logging.error("Test failed, expected packet not received")
|
||||
return True
|
||||
else:
|
||||
logging.info("Test succeeded")
|
||||
return False
|
||||
return (len(receivedPackets) == 1)
|
||||
|
||||
|
||||
def testThread():
|
||||
logging.info("Found devices, starting tests...")
|
||||
numFail = 0
|
||||
numSuccess = 0
|
||||
while True:
|
||||
global testNumber
|
||||
testNumber = testNumber + 1
|
||||
testSend(interfaces[0], interfaces[1])
|
||||
time.sleep(10)
|
||||
success = testSend(interfaces[0], interfaces[1])
|
||||
if not success:
|
||||
numFail = numFail + 1
|
||||
logging.error(
|
||||
f"Test failed, expected packet not received ({numFail} failures so far)")
|
||||
else:
|
||||
numSuccess = numSuccess + 1
|
||||
logging.info(f"Test succeeded ({numSuccess} successes so far)")
|
||||
|
||||
if numFail >= 2 and numSuccess >= 2:
|
||||
for i in interfaces:
|
||||
i.close()
|
||||
return
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def onConnection(topic=pub.AUTO_TOPIC):
|
||||
|
||||
Reference in New Issue
Block a user