mirror of
https://github.com/meshtastic/python.git
synced 2026-01-03 21:37:57 -05:00
Merge pull request #150 from mkinney/more_work_on_tests
More work on tests
This commit is contained in:
@@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi
|
||||
# no Warning level messages displayed, use"--disable=all --enable=classes
|
||||
# --disable=W"
|
||||
#
|
||||
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,consider-using-f-string
|
||||
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,consider-using-f-string,broad-except,no-else-return,unused-argument,global-statement,global-variable-not-assigned,too-many-boolean-expressions,no-else-raise,bare-except
|
||||
|
||||
|
||||
[BASIC]
|
||||
|
||||
@@ -17,6 +17,7 @@ from .tcp_interface import TCPInterface
|
||||
from .ble_interface import BLEInterface
|
||||
from . import test, remote_hardware
|
||||
from . import portnums_pb2, channel_pb2, mesh_pb2, radioconfig_pb2
|
||||
from . import tunnel
|
||||
from .util import support_info, our_exit
|
||||
|
||||
"""We only import the tunnel code if we are on a platform that can run it"""
|
||||
@@ -162,6 +163,7 @@ def setPref(attributes, name, valStr):
|
||||
val = fromStr(valStr)
|
||||
|
||||
enumType = field.enum_type
|
||||
# pylint: disable=C0123
|
||||
if enumType and type(val) == str:
|
||||
# We've failed so far to convert this string into an enum, try to find it by reflection
|
||||
e = enumType.values_by_name.get(val)
|
||||
@@ -212,7 +214,7 @@ def onConnected(interface):
|
||||
alt = 0
|
||||
lat = 0.0
|
||||
lon = 0.0
|
||||
time = 0 # always set time, but based on the local clock
|
||||
timeval = 0 # always set time, but based on the local clock
|
||||
prefs = interface.localNode.radioConfig.preferences
|
||||
if args.setalt:
|
||||
alt = int(args.setalt)
|
||||
@@ -229,7 +231,7 @@ def onConnected(interface):
|
||||
|
||||
print("Setting device position")
|
||||
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
|
||||
interface.sendPosition(lat, lon, alt, time)
|
||||
interface.sendPosition(lat, lon, alt, timeval)
|
||||
interface.localNode.writeConfig()
|
||||
elif not args.no_time:
|
||||
# We normally provide a current time to the mesh when we connect
|
||||
@@ -358,7 +360,7 @@ def onConnected(interface):
|
||||
getNode().writeConfig()
|
||||
|
||||
if args.configure:
|
||||
with open(args.configure[0]) as file:
|
||||
with open(args.configure[0], encoding='utf8') as file:
|
||||
configuration = yaml.safe_load(file)
|
||||
closeNow = True
|
||||
|
||||
@@ -374,7 +376,7 @@ def onConnected(interface):
|
||||
alt = 0
|
||||
lat = 0.0
|
||||
lon = 0.0
|
||||
time = 0 # always set time, but based on the local clock
|
||||
timeval = 0 # always set time, but based on the local clock
|
||||
prefs = interface.localNode.radioConfig.preferences
|
||||
|
||||
if 'alt' in configuration['location']:
|
||||
@@ -390,7 +392,7 @@ def onConnected(interface):
|
||||
prefs.fixed_position = True
|
||||
print(f"Fixing longitude at {lon} degrees")
|
||||
print("Setting device position")
|
||||
interface.sendPosition(lat, lon, alt, time)
|
||||
interface.sendPosition(lat, lon, alt, timeval)
|
||||
interface.localNode.writeConfig()
|
||||
|
||||
if 'user_prefs' in configuration:
|
||||
@@ -531,7 +533,6 @@ def onConnected(interface):
|
||||
print(qr.terminal())
|
||||
|
||||
if have_tunnel and args.tunnel:
|
||||
from . import tunnel
|
||||
# Even if others said we could close, stay open if the user asked for a tunnel
|
||||
closeNow = False
|
||||
tunnel.Tunnel(interface, subnet=args.tunnel_net)
|
||||
@@ -598,8 +599,9 @@ def common():
|
||||
'This option has been deprecated, see help below for the correct replacement...')
|
||||
parser.print_help(sys.stderr)
|
||||
sys.exit(1)
|
||||
elif args.test:
|
||||
result = test.testAll()
|
||||
elif args.numTests:
|
||||
numTests = int(args.numTests[0])
|
||||
result = test.testAll(numTests)
|
||||
if not result:
|
||||
our_exit("Warning: Test was not successful.")
|
||||
else:
|
||||
@@ -611,8 +613,10 @@ def common():
|
||||
logfile = None
|
||||
else:
|
||||
logging.info(f"Logging serial output to {args.seriallog}")
|
||||
# Note: using "line buffering"
|
||||
# pylint: disable=R1732
|
||||
logfile = open(args.seriallog, 'w+',
|
||||
buffering=1) # line buffering
|
||||
buffering=1, encoding='utf8')
|
||||
|
||||
subscribe()
|
||||
if args.ble:
|
||||
@@ -763,14 +767,16 @@ def initParser():
|
||||
"--setlon", help="Set device longitude (allows use without GPS)")
|
||||
|
||||
parser.add_argument(
|
||||
"--pos-fields", help="Specify position message fields. Use '0' for list of valid values. Can pass multiple values as a space separated list like this: '--pos-fields POS_ALTITUDE POS_ALT_MSL'",
|
||||
"--pos-fields", help="Specify position message fields. Use '0' for list of valid values. "\
|
||||
"Can pass multiple values as a space separated list like "\
|
||||
"this: '--pos-fields POS_ALTITUDE POS_ALT_MSL'",
|
||||
nargs="*", action="store")
|
||||
|
||||
parser.add_argument("--debug", help="Show API library debug log messages",
|
||||
action="store_true")
|
||||
|
||||
parser.add_argument("--test", help="Run stress test against all connected Meshtastic devices",
|
||||
action="store_true")
|
||||
nargs=1, dest='numTests', action="store")
|
||||
|
||||
parser.add_argument("--ble", help="BLE mac address to connect to (BLE is not yet supported for this tool)",
|
||||
default=None)
|
||||
|
||||
@@ -53,6 +53,8 @@ class MeshInterface:
|
||||
self.heartbeatTimer = None
|
||||
random.seed() # FIXME, we should not clobber the random seedval here, instead tell user they must call it
|
||||
self.currentPacketId = random.randint(0, 0xffffffff)
|
||||
self.nodesByNum = None
|
||||
self.configId = None
|
||||
|
||||
def close(self):
|
||||
"""Shutdown this interface"""
|
||||
@@ -540,12 +542,12 @@ class MeshInterface:
|
||||
asDict["raw"] = meshPacket
|
||||
|
||||
# from might be missing if the nodenum was zero.
|
||||
if not "from" in asDict:
|
||||
if "from" not in asDict:
|
||||
asDict["from"] = 0
|
||||
logging.error(
|
||||
f"Device returned a packet we sent, ignoring: {stripnl(asDict)}")
|
||||
return
|
||||
if not "to" in asDict:
|
||||
if "to" not in asDict:
|
||||
asDict["to"] = 0
|
||||
|
||||
# /add fromId and toId fields based on the node ID
|
||||
|
||||
@@ -21,6 +21,7 @@ class Node:
|
||||
self.radioConfig = None
|
||||
self.channels = None
|
||||
self._timeout = Timeout(maxSecs=60)
|
||||
self.partialChannels = None
|
||||
|
||||
def showChannels(self):
|
||||
"""Show human readable description of our channels"""
|
||||
@@ -165,8 +166,8 @@ class Node:
|
||||
for c in self.channels:
|
||||
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
|
||||
channelSet.settings.append(c.settings)
|
||||
bytes = channelSet.SerializeToString()
|
||||
s = base64.urlsafe_b64encode(bytes).decode('ascii')
|
||||
some_bytes = channelSet.SerializeToString()
|
||||
s = base64.urlsafe_b64encode(some_bytes).decode('ascii')
|
||||
return f"https://www.meshtastic.org/d/#{s}".replace("=", "")
|
||||
|
||||
def setURL(self, url):
|
||||
|
||||
@@ -75,8 +75,11 @@ class StreamInterface(MeshInterface):
|
||||
MeshInterface._disconnected(self)
|
||||
|
||||
logging.debug("Closing our port")
|
||||
# pylint: disable=E0203
|
||||
if not self.stream is None:
|
||||
# pylint: disable=E0203
|
||||
self.stream.close()
|
||||
# pylint: disable=W0201
|
||||
self.stream = None
|
||||
|
||||
def _writeBytes(self, b):
|
||||
|
||||
@@ -117,7 +117,7 @@ def runTests(numTests=50, wantAck=False, maxFailures=0):
|
||||
return True
|
||||
|
||||
|
||||
def testThread(numTests=5):
|
||||
def testThread(numTests=50):
|
||||
"""Test thread"""
|
||||
logging.info("Found devices, starting tests...")
|
||||
result = runTests(numTests, wantAck=True)
|
||||
@@ -137,10 +137,10 @@ def openDebugLog(portName):
|
||||
"""Open the debug log file"""
|
||||
debugname = "log" + portName.replace("/", "_")
|
||||
logging.info(f"Writing serial debugging to {debugname}")
|
||||
return open(debugname, 'w+', buffering=1)
|
||||
return open(debugname, 'w+', buffering=1, encoding='utf8')
|
||||
|
||||
|
||||
def testAll():
|
||||
def testAll(numTests=50):
|
||||
"""
|
||||
Run a series of tests using devices we can find.
|
||||
This is called from the cli with the "--test" option.
|
||||
@@ -157,7 +157,7 @@ def testAll():
|
||||
port, debugOut=openDebugLog(port), connectNow=True), ports))
|
||||
|
||||
logging.info("Ports opened, starting test")
|
||||
result = testThread()
|
||||
result = testThread(numTests)
|
||||
|
||||
for i in interfaces:
|
||||
i.close()
|
||||
@@ -174,7 +174,7 @@ def testSimulator():
|
||||
Run with
|
||||
python3 -c 'from meshtastic.test import testSimulator; testSimulator()'
|
||||
"""
|
||||
logging.basicConfig(level=logging.DEBUG if False else logging.INFO)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.info("Connecting to simulator on localhost!")
|
||||
try:
|
||||
iface = TCPInterface("localhost")
|
||||
|
||||
@@ -1,14 +1,23 @@
|
||||
"""Meshtastic unit tests for node.py"""
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from meshtastic.mesh_interface import MeshInterface
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_MeshInterface():
|
||||
def test_MeshInterface(capsys):
|
||||
"""Test that we instantiate a MeshInterface"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
iface.showInfo()
|
||||
iface.localNode.showInfo()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Owner: None \(None\)', out, re.MULTILINE)
|
||||
assert re.search(r'Nodes', out, re.MULTILINE)
|
||||
assert re.search(r'Preferences', out, re.MULTILINE)
|
||||
assert re.search(r'Channels', out, re.MULTILINE)
|
||||
assert re.search(r'Primary channel URL', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
iface.close()
|
||||
|
||||
@@ -66,11 +66,20 @@ def test_smoke1_pos_fields():
|
||||
|
||||
|
||||
@pytest.mark.smoke1
|
||||
def test_smoke1_test():
|
||||
def test_smoke1_test_no_arg():
|
||||
"""Test --test
|
||||
Note: Test without arg.
|
||||
"""
|
||||
return_value, _ = subprocess.getstatusoutput('meshtastic --test')
|
||||
assert return_value == 2
|
||||
|
||||
|
||||
@pytest.mark.smoke1
|
||||
def test_smoke1_test_with_arg_but_no_hardware():
|
||||
"""Test --test
|
||||
Note: Since only one device is connected, it will not do much.
|
||||
"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --test')
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --test 5')
|
||||
assert re.search(r'^Warning: Must have at least two devices', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
|
||||
@@ -364,6 +373,8 @@ def test_smoke1_seturl_invalid_url():
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search('Warning: There were no settings', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smoke1
|
||||
@@ -382,6 +393,8 @@ def test_smoke1_configure():
|
||||
assert re.search('^Set screen_on_secs to 31536000', out, re.MULTILINE)
|
||||
assert re.search('^Set wait_bluetooth_secs to 31536000', out, re.MULTILINE)
|
||||
assert re.search('^Writing modified preferences to device', out, re.MULTILINE)
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
|
||||
|
||||
@pytest.mark.smoke1
|
||||
|
||||
@@ -16,7 +16,7 @@ def test_smoke2_info():
|
||||
@pytest.mark.smoke2
|
||||
def test_smoke2_test():
|
||||
"""Test --test"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --test')
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --test 5')
|
||||
assert re.search(r'Writing serial debugging', out, re.MULTILINE)
|
||||
assert re.search(r'Ports opened', out, re.MULTILINE)
|
||||
assert re.search(r'Running 5 tests', out, re.MULTILINE)
|
||||
|
||||
@@ -1,8 +1,20 @@
|
||||
"""Meshtastic unit tests for node.py"""
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from meshtastic.util import pskToString, our_exit
|
||||
from meshtastic.util import fixme, stripnl, pskToString, our_exit, support_info
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_stripnl():
|
||||
"""Test stripnl"""
|
||||
assert stripnl('') == ''
|
||||
assert stripnl('a\n') == 'a'
|
||||
assert stripnl(' a \n ') == 'a'
|
||||
assert stripnl('a\nb') == 'a b'
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pskToString_empty_string():
|
||||
@@ -50,3 +62,23 @@ def test_our_exit_non_zero_return_value():
|
||||
our_exit("Error: Some message", 1)
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_fixme():
|
||||
"""Test fixme"""
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
fixme("some exception")
|
||||
assert pytest_wrapped_e.type == Exception
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_support_info(capsys):
|
||||
"""Test support_info"""
|
||||
support_info()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'System', out, re.MULTILINE)
|
||||
assert re.search(r'Platform', out, re.MULTILINE)
|
||||
assert re.search(r'Machine', out, re.MULTILINE)
|
||||
assert re.search(r'Executable', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
@@ -18,6 +18,9 @@
|
||||
import logging
|
||||
import threading
|
||||
from pubsub import pub
|
||||
|
||||
from pytap2 import TapDevice
|
||||
|
||||
from . import portnums_pb2
|
||||
|
||||
# A new non standard log level that is lower level than DEBUG
|
||||
@@ -98,7 +101,6 @@ class Tunnel:
|
||||
|
||||
logging.debug("creating TUN device with MTU=200")
|
||||
# FIXME - figure out real max MTU, it should be 240 - the overhead bytes for SubPacket and Data
|
||||
from pytap2 import TapDevice
|
||||
self.tun = TapDevice(name="mesh")
|
||||
self.tun.up()
|
||||
self.tun.ifconfig(address=myAddr, netmask=netmask, mtu=200)
|
||||
@@ -108,6 +110,7 @@ class Tunnel:
|
||||
self._rxThread.start()
|
||||
|
||||
def onReceive(self, packet):
|
||||
"""onReceive"""
|
||||
p = packet["decoded"]["payload"]
|
||||
if packet["from"] == self.iface.myInfo.my_node_num:
|
||||
logging.debug("Ignoring message we sent")
|
||||
@@ -206,4 +209,5 @@ class Tunnel:
|
||||
f"Dropping packet because no node found for destIP={ipstr(destAddr)}")
|
||||
|
||||
def close(self):
|
||||
"""Close"""
|
||||
self.tun.close()
|
||||
|
||||
Reference in New Issue
Block a user