mirror of
https://github.com/meshtastic/python.git
synced 2025-12-25 17:07:53 -05:00
Compare commits
38 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b08e96b66a | ||
|
|
9e74ead54e | ||
|
|
2cf52f3df6 | ||
|
|
50e9a0a9f7 | ||
|
|
841c44e05c | ||
|
|
465bafeb30 | ||
|
|
cb8dafbd31 | ||
|
|
52db617b06 | ||
|
|
ec622590da | ||
|
|
345cb1bdc4 | ||
|
|
8ca692a26e | ||
|
|
d1ea68d7dc | ||
|
|
b56440a4e8 | ||
|
|
1401b949a3 | ||
|
|
97f3ce6198 | ||
|
|
8019391914 | ||
|
|
8d10010ab1 | ||
|
|
a2b4d2a96a | ||
|
|
4de558bdf6 | ||
|
|
8ff06a0de1 | ||
|
|
f8ad6061c1 | ||
|
|
aa4cdb6aea | ||
|
|
cba424fde0 | ||
|
|
6c7a870645 | ||
|
|
f42b1ad4e0 | ||
|
|
30a51952e0 | ||
|
|
5de754c5ab | ||
|
|
bda446c7b5 | ||
|
|
f79540f197 | ||
|
|
d507697e56 | ||
|
|
acbc1f2e30 | ||
|
|
7b3c68119c | ||
|
|
39f97166b0 | ||
|
|
541d19cafb | ||
|
|
969a81b779 | ||
|
|
3f76c1efb0 | ||
|
|
774849189f | ||
|
|
53d626aa72 |
90
.github/workflows/build_executables.yml
vendored
Normal file
90
.github/workflows/build_executables.yml
vendored
Normal file
@@ -0,0 +1,90 @@
|
||||
name: Build and publish standalone executables
|
||||
|
||||
on: workflow_dispatch
|
||||
|
||||
jobs:
|
||||
|
||||
build-and-publish-mac:
|
||||
runs-on: macos-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.9
|
||||
|
||||
- name: Setup code signing
|
||||
env:
|
||||
MACOS_CERTIFICATE: ${{ secrets.MACOS_CERTIFICATE }}
|
||||
MACOS_CERTIFICATE_PWD: ${{ secrets.MACOS_CERTIFICATE_PWD }}
|
||||
MACOS_KEYCHAIN_PASSWORD: ${{ secrets.MACOS_KEYCHAIN_PASSWORD }}
|
||||
run: |
|
||||
echo $MACOS_CERTIFICATE | base64 --decode > certificate.p12
|
||||
security create-keychain -p "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
|
||||
security default-keychain -s meshtastic.keychain
|
||||
security unlock-keychain -p "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
|
||||
security import certificate.p12 -k meshtastic.keychain -P "$MACOS_CERTIFICATE_PWD" -T /usr/bin/codesign
|
||||
security set-key-partition-list -S apple-tool:,apple:,codesign: -s -k "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
|
||||
|
||||
- name: Build
|
||||
env:
|
||||
MACOS_SIGNING_IDENTITY: ${{ secrets.MACOS_SIGNING_IDENTITY }}
|
||||
run: |
|
||||
pip install pyinstaller
|
||||
pip install -r requirements.txt
|
||||
pip install .
|
||||
pyinstaller -F -n meshtastic --collect-all meshtastic --codesign-identity "$MACOS_SIGNING_IDENTITY" meshtastic/__main__.py
|
||||
- uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: meshtastic_mac
|
||||
path: dist
|
||||
|
||||
build-and-publish-ubuntu:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.9
|
||||
|
||||
- name: Build
|
||||
run: |
|
||||
pip install pyinstaller
|
||||
pip install -r requirements.txt
|
||||
pip install .
|
||||
pyinstaller -F -n meshtastic --collect-all meshtastic meshtastic/__main__.py
|
||||
- uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: meshtastic_ubuntu
|
||||
path: dist
|
||||
|
||||
|
||||
build-and-publish-windows:
|
||||
runs-on: windows-latest
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: 3.9
|
||||
|
||||
- name: Build
|
||||
run: |
|
||||
pip install pyinstaller
|
||||
pip install -r requirements.txt
|
||||
pip install .
|
||||
pyinstaller -F -n meshtastic --collect-all meshtastic meshtastic/__main__.py
|
||||
- uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: meshtastic_windows
|
||||
path: dist
|
||||
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -17,6 +17,7 @@ jobs:
|
||||
- "3.7"
|
||||
- "3.8"
|
||||
- "3.9"
|
||||
- "3.10"
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Install Python 3
|
||||
@@ -58,6 +59,7 @@ jobs:
|
||||
- "3.7"
|
||||
- "3.8"
|
||||
- "3.9"
|
||||
- "3.10"
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Install Python 3
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -14,3 +14,4 @@ venv/
|
||||
.DS_Store
|
||||
__pycache__
|
||||
examples/__pycache__
|
||||
meshtastic.spec
|
||||
|
||||
6
Makefile
6
Makefile
@@ -2,6 +2,10 @@
|
||||
test:
|
||||
pytest -m unit
|
||||
|
||||
# only run the smoke tests against the virtual device
|
||||
virt:
|
||||
pytest -m smokevirt
|
||||
|
||||
# local install
|
||||
install:
|
||||
pip install .
|
||||
@@ -16,7 +20,7 @@ lint:
|
||||
|
||||
# show the slowest unit tests
|
||||
slow:
|
||||
pytest --durations=5
|
||||
pytest -m unit --durations=5
|
||||
|
||||
# run the coverage report and open results in a browser
|
||||
cov:
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
import argparse
|
||||
import platform
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import yaml
|
||||
@@ -17,6 +18,7 @@ from meshtastic import remote_hardware
|
||||
from meshtastic.ble_interface import BLEInterface
|
||||
from meshtastic import portnums_pb2, channel_pb2, radioconfig_pb2
|
||||
from meshtastic.globals import Globals
|
||||
from meshtastic.__init__ import BROADCAST_ADDR
|
||||
|
||||
|
||||
def onReceive(packet, interface):
|
||||
@@ -139,14 +141,6 @@ def onConnected(interface):
|
||||
if not args.export_config:
|
||||
print("Connected to radio")
|
||||
|
||||
def getNode():
|
||||
"""This operation could be expensive, so we try to cache the results"""
|
||||
targetNode = our_globals.get_target_node()
|
||||
if not targetNode:
|
||||
targetNode = interface.getNode(args.destOrLocal)
|
||||
our_globals.set_target_node(targetNode)
|
||||
return targetNode
|
||||
|
||||
if args.setlat or args.setlon or args.setalt:
|
||||
closeNow = True
|
||||
|
||||
@@ -178,12 +172,12 @@ def onConnected(interface):
|
||||
if args.set_owner:
|
||||
closeNow = True
|
||||
print(f"Setting device owner to {args.set_owner}")
|
||||
getNode().setOwner(args.set_owner)
|
||||
interface.getNode(args.dest).setOwner(args.set_owner)
|
||||
|
||||
if args.pos_fields:
|
||||
# If --pos-fields invoked with args, set position fields
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
allFields = 0
|
||||
|
||||
try:
|
||||
@@ -200,12 +194,12 @@ def onConnected(interface):
|
||||
print(f"Setting position fields to {allFields}")
|
||||
setPref(prefs, 'position_flags', ('%d' % allFields))
|
||||
print("Writing modified preferences to device")
|
||||
getNode().writeConfig()
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
elif args.pos_fields is not None:
|
||||
# If --pos-fields invoked without args, read and display current value
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
|
||||
fieldNames = []
|
||||
for bit in radioconfig_pb2.PositionFlags.values():
|
||||
@@ -224,81 +218,85 @@ def onConnected(interface):
|
||||
print(sorted(meshtastic.mesh_pb2.Team.keys()))
|
||||
else:
|
||||
print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}")
|
||||
getNode().setOwner(team=v_team)
|
||||
interface.getNode(args.dest).setOwner(team=v_team)
|
||||
|
||||
if args.set_ham:
|
||||
closeNow = True
|
||||
print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
|
||||
getNode().setOwner(args.set_ham, is_licensed=True)
|
||||
interface.getNode(args.dest).setOwner(args.set_ham, is_licensed=True)
|
||||
# Must turn off encryption on primary channel
|
||||
getNode().turnOffEncryptionOnPrimaryChannel()
|
||||
interface.getNode(args.dest).turnOffEncryptionOnPrimaryChannel()
|
||||
|
||||
if args.reboot:
|
||||
closeNow = True
|
||||
getNode().reboot()
|
||||
interface.getNode(args.dest).reboot()
|
||||
|
||||
if args.sendtext:
|
||||
closeNow = True
|
||||
channelIndex = 0
|
||||
if args.ch_index is not None:
|
||||
channelIndex = int(args.ch_index)
|
||||
ch = getNode().getChannelByChannelIndex(channelIndex)
|
||||
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
|
||||
logging.debug(f'ch:{ch}')
|
||||
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
|
||||
print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}")
|
||||
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
|
||||
print(f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}")
|
||||
interface.sendText(args.sendtext, args.dest, wantAck=True, channelIndex=channelIndex)
|
||||
else:
|
||||
meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.")
|
||||
|
||||
if args.sendping:
|
||||
payload = str.encode("test string")
|
||||
print(f"Sending ping message to {args.destOrAll}")
|
||||
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
|
||||
print(f"Sending ping message to {args.dest}")
|
||||
interface.sendData(payload, args.dest, portNum=portnums_pb2.PortNum.REPLY_APP,
|
||||
wantAck=True, wantResponse=True)
|
||||
|
||||
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
|
||||
rhc = remote_hardware.RemoteHardwareClient(interface)
|
||||
if args.dest == BROADCAST_ADDR:
|
||||
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
|
||||
else:
|
||||
rhc = remote_hardware.RemoteHardwareClient(interface)
|
||||
|
||||
if args.gpio_wrb:
|
||||
bitmask = 0
|
||||
bitval = 0
|
||||
for wrpair in (args.gpio_wrb or []):
|
||||
bitmask |= 1 << int(wrpair[0])
|
||||
bitval |= int(wrpair[1]) << int(wrpair[0])
|
||||
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
|
||||
rhc.writeGPIOs(args.dest, bitmask, bitval)
|
||||
closeNow = True
|
||||
if args.gpio_wrb:
|
||||
bitmask = 0
|
||||
bitval = 0
|
||||
for wrpair in (args.gpio_wrb or []):
|
||||
bitmask |= 1 << int(wrpair[0])
|
||||
bitval |= int(wrpair[1]) << int(wrpair[0])
|
||||
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
|
||||
rhc.writeGPIOs(args.dest, bitmask, bitval)
|
||||
closeNow = True
|
||||
|
||||
if args.gpio_rd:
|
||||
bitmask = int(args.gpio_rd, 16)
|
||||
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||
interface.mask = bitmask
|
||||
rhc.readGPIOs(args.dest, bitmask, None)
|
||||
if not interface.noProto:
|
||||
# wait up to X seconds for a response
|
||||
for _ in range(10):
|
||||
if args.gpio_rd:
|
||||
bitmask = int(args.gpio_rd, 16)
|
||||
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||
interface.mask = bitmask
|
||||
rhc.readGPIOs(args.dest, bitmask, None)
|
||||
if not interface.noProto:
|
||||
# wait up to X seconds for a response
|
||||
for _ in range(10):
|
||||
time.sleep(1)
|
||||
if interface.gotResponse:
|
||||
break
|
||||
logging.debug(f'end of gpio_rd')
|
||||
|
||||
if args.gpio_watch:
|
||||
bitmask = int(args.gpio_watch, 16)
|
||||
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
|
||||
while True:
|
||||
rhc.watchGPIOs(args.dest, bitmask)
|
||||
time.sleep(1)
|
||||
if interface.gotResponse:
|
||||
break
|
||||
logging.debug(f'end of gpio_rd')
|
||||
|
||||
if args.gpio_watch:
|
||||
bitmask = int(args.gpio_watch, 16)
|
||||
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
|
||||
while True:
|
||||
rhc.watchGPIOs(args.dest, bitmask)
|
||||
time.sleep(1)
|
||||
|
||||
# handle settings
|
||||
if args.set:
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
|
||||
# Handle the int/float/bool arguments
|
||||
for pref in args.set:
|
||||
setPref(prefs, pref[0], pref[1])
|
||||
|
||||
print("Writing modified preferences to device")
|
||||
getNode().writeConfig()
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
if args.configure:
|
||||
with open(args.configure[0], encoding='utf8') as file:
|
||||
@@ -307,11 +305,11 @@ def onConnected(interface):
|
||||
|
||||
if 'owner' in configuration:
|
||||
print(f"Setting device owner to {configuration['owner']}")
|
||||
getNode().setOwner(configuration['owner'])
|
||||
interface.getNode(args.dest).setOwner(configuration['owner'])
|
||||
|
||||
if 'channel_url' in configuration:
|
||||
print("Setting channel url to", configuration['channel_url'])
|
||||
getNode().setURL(configuration['channel_url'])
|
||||
interface.getNode(args.dest).setURL(configuration['channel_url'])
|
||||
|
||||
if 'location' in configuration:
|
||||
alt = 0
|
||||
@@ -336,11 +334,11 @@ def onConnected(interface):
|
||||
interface.localNode.writeConfig()
|
||||
|
||||
if 'user_prefs' in configuration:
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
for pref in configuration['user_prefs']:
|
||||
setPref(prefs, pref, str(configuration['user_prefs'][pref]))
|
||||
print("Writing modified preferences to device")
|
||||
getNode().writeConfig()
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
if args.export_config:
|
||||
# export the configuration (the opposite of '--configure')
|
||||
@@ -349,7 +347,7 @@ def onConnected(interface):
|
||||
|
||||
if args.seturl:
|
||||
closeNow = True
|
||||
getNode().setURL(args.seturl)
|
||||
interface.getNode(args.dest).setURL(args.seturl)
|
||||
|
||||
# handle changing channels
|
||||
|
||||
@@ -357,7 +355,7 @@ def onConnected(interface):
|
||||
closeNow = True
|
||||
if len(args.ch_add) > 10:
|
||||
meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.")
|
||||
n = getNode()
|
||||
n = interface.getNode(args.dest)
|
||||
ch = n.getChannelByName(args.ch_add)
|
||||
if ch:
|
||||
meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.")
|
||||
@@ -385,7 +383,7 @@ def onConnected(interface):
|
||||
meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1)
|
||||
else:
|
||||
print(f"Deleting channel {channelIndex}")
|
||||
ch = getNode().deleteChannel(channelIndex)
|
||||
ch = interface.getNode(args.dest).deleteChannel(channelIndex)
|
||||
|
||||
ch_changes = [args.ch_longslow, args.ch_longfast,
|
||||
args.ch_mediumslow, args.ch_mediumfast,
|
||||
@@ -401,7 +399,7 @@ def onConnected(interface):
|
||||
channelIndex = 0
|
||||
else:
|
||||
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
|
||||
ch = getNode().channels[channelIndex]
|
||||
ch = interface.getNode(args.dest).channels[channelIndex]
|
||||
|
||||
if any_primary_channel_changes or args.ch_enable or args.ch_disable:
|
||||
|
||||
@@ -462,21 +460,22 @@ def onConnected(interface):
|
||||
ch.role = channel_pb2.Channel.Role.DISABLED
|
||||
|
||||
print(f"Writing modified channels to device")
|
||||
getNode().writeChannel(channelIndex)
|
||||
interface.getNode(args.dest).writeChannel(channelIndex)
|
||||
|
||||
if args.info:
|
||||
print("")
|
||||
if not args.dest: # If we aren't trying to talk to our local node, don't show it
|
||||
# If we aren't trying to talk to our local node, don't show it
|
||||
if args.dest == BROADCAST_ADDR:
|
||||
interface.showInfo()
|
||||
|
||||
print("")
|
||||
getNode().showInfo()
|
||||
interface.getNode(args.dest).showInfo()
|
||||
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
|
||||
print("")
|
||||
|
||||
if args.get:
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
|
||||
# Handle the int/float/bool arguments
|
||||
for pref in args.get:
|
||||
@@ -590,13 +589,8 @@ def common():
|
||||
channelIndex = int(args.ch_index)
|
||||
our_globals.set_channel_index(channelIndex)
|
||||
|
||||
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
|
||||
if not args.dest:
|
||||
args.destOrAll = "^all"
|
||||
args.destOrLocal = "^local"
|
||||
else:
|
||||
args.destOrAll = args.dest
|
||||
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
|
||||
args.dest = BROADCAST_ADDR
|
||||
|
||||
if not args.seriallog:
|
||||
if args.noproto:
|
||||
@@ -635,12 +629,19 @@ def common():
|
||||
elif args.host:
|
||||
client = meshtastic.tcp_interface.TCPInterface(args.host, debugOut=logfile, noProto=args.noproto)
|
||||
else:
|
||||
client = meshtastic.serial_interface.SerialInterface(args.port, debugOut=logfile, noProto=args.noproto)
|
||||
try:
|
||||
client = meshtastic.serial_interface.SerialInterface(args.port, debugOut=logfile, noProto=args.noproto)
|
||||
except PermissionError as ex:
|
||||
username = os.getlogin()
|
||||
message = "Permission Error:\n"
|
||||
message += " Need to add yourself to the 'dialout' group by running:\n"
|
||||
message += f" sudo usermod -a -G dialout {username}\n"
|
||||
message += " After running that command, log out and re-login for it to take effect.\n"
|
||||
message += f"Error was:{ex}"
|
||||
meshtastic.util.our_exit(message)
|
||||
|
||||
# We assume client is fully connected now
|
||||
onConnected(client)
|
||||
#if logfile:
|
||||
#logfile.close()
|
||||
|
||||
have_tunnel = platform.system() == 'Linux'
|
||||
if args.noproto or args.reply or (have_tunnel and args.tunnel): # loop until someone presses ctrlc
|
||||
@@ -818,8 +819,8 @@ def initParser():
|
||||
|
||||
parser.set_defaults(deprecated=None)
|
||||
|
||||
parser.add_argument('--version', action='version',
|
||||
version=f"{pkg_resources.require('meshtastic')[0].version}")
|
||||
the_version = pkg_resources.get_distribution("meshtastic").version
|
||||
parser.add_argument('--version', action='version', version=f"{the_version}")
|
||||
|
||||
parser.add_argument(
|
||||
"--support", action='store_true', help="Show support info (useful when troubleshooting an issue)")
|
||||
|
||||
@@ -27,7 +27,6 @@ class Globals:
|
||||
Globals.__instance = self
|
||||
self.args = None
|
||||
self.parser = None
|
||||
self.target_node = None
|
||||
self.channel_index = None
|
||||
self.logfile = None
|
||||
self.tunnelInstance = None
|
||||
@@ -36,7 +35,6 @@ class Globals:
|
||||
"""Reset all of our globals. If you add a member, add it to this method, too."""
|
||||
self.args = None
|
||||
self.parser = None
|
||||
self.target_node = None
|
||||
self.channel_index = None
|
||||
self.logfile = None
|
||||
self.tunnelInstance = None
|
||||
@@ -50,10 +48,6 @@ class Globals:
|
||||
"""Set the parser"""
|
||||
self.parser = parser
|
||||
|
||||
def set_target_node(self, target_node):
|
||||
"""Set the target_node"""
|
||||
self.target_node = target_node
|
||||
|
||||
def set_channel_index(self, channel_index):
|
||||
"""Set the channel_index"""
|
||||
self.channel_index = channel_index
|
||||
@@ -75,10 +69,6 @@ class Globals:
|
||||
"""Get parser"""
|
||||
return self.parser
|
||||
|
||||
def get_target_node(self):
|
||||
"""Get target_node"""
|
||||
return self.target_node
|
||||
|
||||
def get_channel_index(self):
|
||||
"""Get channel_index"""
|
||||
return self.channel_index
|
||||
|
||||
@@ -159,7 +159,7 @@ class MeshInterface:
|
||||
|
||||
def getNode(self, nodeId):
|
||||
"""Return a node object which contains device settings and channel info"""
|
||||
if nodeId == LOCAL_ADDR:
|
||||
if nodeId in (LOCAL_ADDR, BROADCAST_ADDR):
|
||||
return self.localNode
|
||||
else:
|
||||
n = meshtastic.node.Node(self, nodeId)
|
||||
|
||||
@@ -257,6 +257,7 @@ class Node:
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.get_radio_request = True
|
||||
|
||||
# TODO: should we check that localNode has an 'admin' channel?
|
||||
# Show progress message for super slow operations
|
||||
if self != self.iface.localNode:
|
||||
print("Requesting preferences from remote node.")
|
||||
|
||||
@@ -40,28 +40,25 @@ class SerialInterface(StreamInterface):
|
||||
|
||||
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
|
||||
# see https://github.com/pyserial/pyserial/issues/124
|
||||
if not self.noProto:
|
||||
if platform.system() != 'Windows':
|
||||
with open(devPath, encoding='utf8') as f:
|
||||
attrs = termios.tcgetattr(f)
|
||||
attrs[2] = attrs[2] & ~termios.HUPCL
|
||||
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
||||
f.close()
|
||||
time.sleep(0.1)
|
||||
if platform.system() != 'Windows':
|
||||
with open(devPath, encoding='utf8') as f:
|
||||
attrs = termios.tcgetattr(f)
|
||||
attrs[2] = attrs[2] & ~termios.HUPCL
|
||||
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
||||
f.close()
|
||||
time.sleep(0.1)
|
||||
|
||||
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
|
||||
if not self.noProto:
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
|
||||
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
|
||||
|
||||
def close(self):
|
||||
"""Close a connection to the device"""
|
||||
if not self.noProto:
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
logging.debug("Closing Serial stream")
|
||||
StreamInterface.close(self)
|
||||
|
||||
@@ -119,7 +119,6 @@ def test_main_test_no_ports(patched_find_ports, reset_globals, capsys):
|
||||
sys.argv = ['', '--test']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
assert Globals.getInstance().get_target_node() is None
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
@@ -137,7 +136,6 @@ def test_main_test_one_port(patched_find_ports, reset_globals, capsys):
|
||||
sys.argv = ['', '--test']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
assert Globals.getInstance().get_target_node() is None
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
@@ -183,7 +181,7 @@ def test_main_test_two_ports_fails(patched_test_all, reset_globals, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_info(capsys, reset_globals):
|
||||
def test_main_info(capsys, caplog, reset_globals):
|
||||
"""Test --info"""
|
||||
sys.argv = ['', '--info']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
@@ -192,13 +190,37 @@ def test_main_info(capsys, reset_globals):
|
||||
def mock_showInfo():
|
||||
print('inside mocked showInfo')
|
||||
iface.showInfo.side_effect = mock_showInfo
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
main()
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
main()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('os.getlogin')
|
||||
def test_main_info_with_permission_error(patched_getlogin, capsys, caplog, reset_globals):
|
||||
"""Test --info"""
|
||||
sys.argv = ['', '--info']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
patched_getlogin.return_value = 'me'
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
mo.side_effect = PermissionError('bla bla')
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
|
||||
patched_getlogin.assert_called()
|
||||
assert re.search(r'Need to add yourself', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -456,63 +478,71 @@ def test_main_sendtext_with_channel(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_sendtext_with_invalid_channel(capsys, reset_globals):
|
||||
def test_main_sendtext_with_invalid_channel(caplog, capsys, reset_globals):
|
||||
"""Test --sendtext"""
|
||||
sys.argv = ['', '--sendtext', 'hello', '--ch-index', '-1']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.getChannelByChannelIndex.return_value = None
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
iface.getNode.return_value.getChannelByChannelIndex.return_value = None
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'is not a valid channel', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
iface.localNode.getChannelByChannelIndex.return_value = None
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'is not a valid channel', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_sendtext_with_invalid_channel_nine(capsys, reset_globals):
|
||||
def test_main_sendtext_with_invalid_channel_nine(caplog, capsys, reset_globals):
|
||||
"""Test --sendtext"""
|
||||
sys.argv = ['', '--sendtext', 'hello', '--ch-index', '9']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
iface.getNode.return_value.getChannelByChannelIndex.return_value = None
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'is not a valid channel', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
iface.localNode.getChannelByChannelIndex.return_value = None
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'is not a valid channel', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_sendtext_with_dest(capsys, reset_globals):
|
||||
def test_main_sendtext_with_dest(capsys, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test --sendtext with --dest"""
|
||||
sys.argv = ['', '--sendtext', 'hello', '--dest', 'foo']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
def mock_sendText(text, dest, wantAck, channelIndex):
|
||||
print('inside mocked sendText')
|
||||
iface.sendText.side_effect = mock_sendText
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
mocked_channel = MagicMock(autospec=Channel)
|
||||
iface.localNode.getChannelByChannelIndex = mocked_channel
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
main()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert re.search(r'Sending text message', out, re.MULTILINE)
|
||||
assert re.search(r'inside mocked sendText', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert not re.search(r"Warning: 0 is not a valid channel", out, re.MULTILINE)
|
||||
assert not re.search(r"There is a SECONDARY channel named 'admin'", out, re.MULTILINE)
|
||||
assert re.search(r'Warning: NodeId foo not found in DB', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -1094,18 +1124,14 @@ def test_main_pos_fields_no_args(capsys, reset_globals):
|
||||
pos_flags = MagicMock(autospec=meshtastic.radioconfig_pb2.PositionFlags)
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface') as mo:
|
||||
mo().getNode().radioConfig.preferences.position_flags = 35
|
||||
with patch('meshtastic.radioconfig_pb2.PositionFlags', return_value=pos_flags) as mrc:
|
||||
# kind of cheating here, we are setting up the node
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
anode = mocked_node()
|
||||
anode.radioConfig.preferences.position_flags = 35
|
||||
Globals.getInstance().set_target_node(anode)
|
||||
|
||||
mrc.values.return_value = [0, 1, 2, 4, 8, 16, 32, 64, 128, 256]
|
||||
# Note: When you use side_effect and a list, each call will use a value from the front of the list then
|
||||
# remove that value from the list. If there are three values in the list, we expect it to be called
|
||||
# three times.
|
||||
mrc.Name.side_effect = [ 'POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY' ]
|
||||
mrc.Name.side_effect = ['POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY']
|
||||
|
||||
main()
|
||||
|
||||
@@ -1186,13 +1212,9 @@ def test_main_get_with_valid_values(capsys, reset_globals):
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface') as mo:
|
||||
|
||||
# kind of cheating here, we are setting up the node
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
anode = mocked_node()
|
||||
anode.radioConfig.preferences.wifi_ssid = 'foo'
|
||||
anode.radioConfig.preferences.ls_secs = 300
|
||||
anode.radioConfig.preferences.fixed_position = False
|
||||
Globals.getInstance().set_target_node(anode)
|
||||
mo().getNode().radioConfig.preferences.wifi_ssid = 'foo'
|
||||
mo().getNode().radioConfig.preferences.ls_secs = 300
|
||||
mo().getNode().radioConfig.preferences.fixed_position = False
|
||||
|
||||
main()
|
||||
|
||||
@@ -1421,7 +1443,7 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
|
||||
@pytest.mark.unit
|
||||
def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals):
|
||||
"""Test --gpio_rd with no named gpio channel"""
|
||||
sys.argv = ['', '--gpio-rd', '0x10']
|
||||
sys.argv = ['', '--gpio-rd', '0x10', '--dest', '!foo']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
@@ -1442,9 +1464,9 @@ def test_main_gpio_rd_no_dest(capsys, reset_globals):
|
||||
sys.argv = ['', '--gpio-rd', '0x2000']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
channel = Channel(index=1, role=1)
|
||||
channel.settings.modem_config = 3
|
||||
channel.settings.psk = b'\x01'
|
||||
channel = Channel(index=2, role=2)
|
||||
channel.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
|
||||
channel.settings.name = 'gpio'
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode.getChannelByName.return_value = channel
|
||||
|
||||
@@ -591,7 +591,7 @@ def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes)
|
||||
capsys.readouterr()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_waitForConfig(caplog, capsys):
|
||||
"""Test waitForConfig()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
|
||||
@@ -882,7 +882,7 @@ def test_onResponseRequestSetting_with_error(capsys):
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_waitForConfig():
|
||||
"""Test waitForConfig()"""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
@@ -3,15 +3,19 @@
|
||||
import re
|
||||
|
||||
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import patch, mock_open
|
||||
import pytest
|
||||
|
||||
from ..serial_interface import SerialInterface
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("time.sleep")
|
||||
@patch("termios.tcsetattr")
|
||||
@patch("termios.tcgetattr")
|
||||
@patch("builtins.open", new_callable=mock_open, read_data="data")
|
||||
@patch('serial.Serial')
|
||||
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyUSBfake'])
|
||||
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, capsys):
|
||||
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_sleep, capsys):
|
||||
"""Test that we can instantiate a SerialInterface with a single port"""
|
||||
iface = SerialInterface(noProto=True)
|
||||
iface.showInfo()
|
||||
|
||||
706
meshtastic/tests/test_smokevirt.py
Normal file
706
meshtastic/tests/test_smokevirt.py
Normal file
@@ -0,0 +1,706 @@
|
||||
"""Meshtastic smoke tests with a single virtual device via localhost.
|
||||
|
||||
During the CI build of the Meshtastic-device, a build.zip file is created.
|
||||
Inside that build.zip is a standalone executable meshtasticd_linux_amd64.
|
||||
That linux executable will simulate a Meshtastic device listening on localhost.
|
||||
|
||||
This smoke test runs against that localhost.
|
||||
|
||||
"""
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
import platform
|
||||
import os
|
||||
|
||||
# Do not like using hard coded sleeps, but it probably makes
|
||||
# sense to pause for the radio at apprpriate times
|
||||
import pytest
|
||||
|
||||
from ..util import findPorts
|
||||
|
||||
# seconds to pause after running a meshtastic command
|
||||
PAUSE_AFTER_COMMAND = 0.1
|
||||
PAUSE_AFTER_REBOOT = 0.1
|
||||
|
||||
|
||||
#TODO: need to fix the virtual device to have a reboot. When you issue the command
|
||||
# below, you get "FIXME implement reboot for this platform"
|
||||
#@pytest.mark.smokevirt
|
||||
#def test_smokevirt_reboot():
|
||||
# """Test reboot"""
|
||||
# return_value, _ = subprocess.getstatusoutput('meshtastic --host localhost --reboot')
|
||||
# assert return_value == 0
|
||||
# # pause for the radio to reset
|
||||
# time.sleep(8)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_info():
|
||||
"""Test --info"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Owner', out, re.MULTILINE)
|
||||
assert re.search(r'^My info', out, re.MULTILINE)
|
||||
assert re.search(r'^Nodes in mesh', out, re.MULTILINE)
|
||||
assert re.search(r'^Preferences', out, re.MULTILINE)
|
||||
assert re.search(r'^Channels', out, re.MULTILINE)
|
||||
assert re.search(r'^ PRIMARY', out, re.MULTILINE)
|
||||
assert re.search(r'^Primary channel URL', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_sendping():
|
||||
"""Test --sendping"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --sendping')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Sending ping message', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_get_with_invalid_setting():
|
||||
"""Test '--get a_bad_setting'."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get a_bad_setting')
|
||||
assert re.search(r'Choices in sorted order', out)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_set_with_invalid_setting():
|
||||
"""Test '--set a_bad_setting'."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set a_bad_setting foo')
|
||||
assert re.search(r'Choices in sorted order', out)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_ch_set_with_invalid_settingpatch_find_ports():
|
||||
"""Test '--ch-set with a_bad_setting'."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set invalid_setting foo --ch-index 0')
|
||||
assert re.search(r'Choices in sorted order', out)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_pos_fields():
|
||||
"""Test --pos-fields (with some values POS_ALTITUDE POS_ALT_MSL POS_BATTERY)"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --pos-fields POS_ALTITUDE POS_ALT_MSL POS_BATTERY')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Setting position fields to 35', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --pos-fields')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'POS_ALTITUDE', out, re.MULTILINE)
|
||||
assert re.search(r'POS_ALT_MSL', out, re.MULTILINE)
|
||||
assert re.search(r'POS_BATTERY', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_test_with_arg_but_no_hardware():
|
||||
"""Test --test
|
||||
Note: Since only one device is connected, it will not do much.
|
||||
"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --test')
|
||||
assert re.search(r'^Warning: Must have at least two devices', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_debug():
|
||||
"""Test --debug"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info --debug')
|
||||
assert re.search(r'^Owner', out, re.MULTILINE)
|
||||
assert re.search(r'^DEBUG file', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_seriallog_to_file():
|
||||
"""Test --seriallog to a file creates a file"""
|
||||
filename = 'tmpoutput.txt'
|
||||
if os.path.exists(f"{filename}"):
|
||||
os.remove(f"{filename}")
|
||||
return_value, _ = subprocess.getstatusoutput(f'meshtastic --host localhost --info --seriallog {filename}')
|
||||
assert os.path.exists(f"{filename}")
|
||||
assert return_value == 0
|
||||
os.remove(f"{filename}")
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_qr():
|
||||
"""Test --qr"""
|
||||
filename = 'tmpqr'
|
||||
if os.path.exists(f"{filename}"):
|
||||
os.remove(f"{filename}")
|
||||
return_value, _ = subprocess.getstatusoutput(f'meshtastic --host localhost --qr > {filename}')
|
||||
assert os.path.exists(f"{filename}")
|
||||
# not really testing that a valid qr code is created, just that the file size
|
||||
# is reasonably big enough for a qr code
|
||||
assert os.stat(f"{filename}").st_size > 20000
|
||||
assert return_value == 0
|
||||
os.remove(f"{filename}")
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_nodes():
|
||||
"""Test --nodes"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --nodes')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
if platform.system() != 'Windows':
|
||||
assert re.search(r' User ', out, re.MULTILINE)
|
||||
assert re.search(r' 1 ', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_send_hello():
|
||||
"""Test --sendtext hello"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --sendtext hello')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Sending text message hello to \^all', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_port():
|
||||
"""Test --port"""
|
||||
# first, get the ports
|
||||
ports = findPorts()
|
||||
# hopefully there is none
|
||||
assert len(ports) == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_is_router_true():
|
||||
"""Test --set is_router true"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set is_router true')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set is_router to true', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get is_router')
|
||||
assert re.search(r'^is_router: True', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_location_info():
|
||||
"""Test --setlat, --setlon and --setalt """
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --setlat 32.7767 --setlon -96.7970 --setalt 1337')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Fixing altitude', out, re.MULTILINE)
|
||||
assert re.search(r'^Fixing latitude', out, re.MULTILINE)
|
||||
assert re.search(r'^Fixing longitude', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out2 = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'1337', out2, re.MULTILINE)
|
||||
assert re.search(r'32.7767', out2, re.MULTILINE)
|
||||
assert re.search(r'-96.797', out2, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_is_router_false():
|
||||
"""Test --set is_router false"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set is_router false')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set is_router to false', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get is_router')
|
||||
assert re.search(r'^is_router: False', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_owner():
|
||||
"""Test --set-owner name"""
|
||||
# make sure the owner is not Joe
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-owner Bob')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Setting device owner to Bob', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert not re.search(r'Owner: Joe', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-owner Joe')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Setting device owner to Joe', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'Owner: Joe', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_team():
|
||||
"""Test --set-team """
|
||||
# unset the team
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-team CLEAR')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Setting team to CLEAR', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-team CYAN')
|
||||
assert re.search(r'Setting team to CYAN', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'CYAN', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_values():
|
||||
"""Test --ch-longslow, --ch-longfast, --ch-mediumslow, --ch-mediumsfast,
|
||||
--ch-shortslow, and --ch-shortfast arguments
|
||||
"""
|
||||
exp = {
|
||||
'--ch-longslow': 'Bw125Cr48Sf4096',
|
||||
'--ch-longfast': 'Bw31_25Cr48Sf512',
|
||||
'--ch-mediumslow': 'Bw250Cr46Sf2048',
|
||||
'--ch-mediumfast': 'Bw250Cr47Sf1024',
|
||||
'--ch-shortslow': '{ "psk',
|
||||
'--ch-shortfast': 'Bw500Cr45Sf128'
|
||||
}
|
||||
|
||||
for key, val in exp.items():
|
||||
return_value, out = subprocess.getstatusoutput(f'meshtastic --host localhost {key}')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio (might reboot)
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(val, out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_set_name():
|
||||
"""Test --ch-set name"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert not re.search(r'MyChannel', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name MyChannel')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name MyChannel --ch-index 0')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set name to MyChannel', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'MyChannel', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_set_downlink_and_uplink():
|
||||
"""Test -ch-set downlink_enabled X and --ch-set uplink_enabled X"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled false --ch-set uplink_enabled false')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
# pylint: disable=C0301
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled false --ch-set uplink_enabled false --ch-index 0')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert not re.search(r'uplinkEnabled', out, re.MULTILINE)
|
||||
assert not re.search(r'downlinkEnabled', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
# pylint: disable=C0301
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled true --ch-set uplink_enabled true --ch-index 0')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set downlink_enabled to true', out, re.MULTILINE)
|
||||
assert re.search(r'^Set uplink_enabled to true', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'uplinkEnabled', out, re.MULTILINE)
|
||||
assert re.search(r'downlinkEnabled', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_add_and_ch_del():
|
||||
"""Test --ch-add"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
|
||||
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
|
||||
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
# make sure the secondar channel is not there
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert not re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert not re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_enable_and_disable():
|
||||
"""Test --ch-enable and --ch-disable"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
|
||||
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
# ensure they need to specify a --ch-index
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable')
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable --ch-index 1')
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'DISABLED', out, re.MULTILINE)
|
||||
assert re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-enable --ch-index 1')
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_del_a_disabled_non_primary_channel():
|
||||
"""Test --ch-del will work on a disabled non-primary channel."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
|
||||
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
# ensure they need to specify a --ch-index
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable')
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert not re.search(r'DISABLED', out, re.MULTILINE)
|
||||
assert not re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert not re.search(r'testing', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_attempt_to_delete_primary_channel():
|
||||
"""Test that we cannot delete the PRIMARY channel."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 0')
|
||||
assert re.search(r'Warning: Cannot delete primary channel', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_attempt_to_disable_primary_channel():
|
||||
"""Test that we cannot disable the PRIMARY channel."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable --ch-index 0')
|
||||
assert re.search(r'Warning: Cannot enable', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_attempt_to_enable_primary_channel():
|
||||
"""Test that we cannot enable the PRIMARY channel."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-enable --ch-index 0')
|
||||
assert re.search(r'Warning: Cannot enable', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ensure_ch_del_second_of_three_channels():
|
||||
"""Test that when we delete the 2nd of 3 channels, that it deletes the correct channel."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing1')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert re.search(r'testing1', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing2')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'testing2', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'testing2', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ensure_ch_del_third_of_three_channels():
|
||||
"""Test that when we delete the 3rd of 3 channels, that it deletes the correct channel."""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing1')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'SECONDARY', out, re.MULTILINE)
|
||||
assert re.search(r'testing1', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing2')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'testing2', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 2')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'testing1', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_ch_set_modem_config():
|
||||
"""Test --ch-set modem_config"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set modem_config Bw31_25Cr48Sf512')
|
||||
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert not re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set modem_config Bw31_25Cr48Sf512 --ch-index 0')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set modem_config to Bw31_25Cr48Sf512', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_seturl_default():
|
||||
"""Test --seturl with default value"""
|
||||
# set some channel value so we no longer have a default channel
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name foo --ch-index 0')
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
# ensure we no longer have a default primary channel
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert not re.search('CgUYAyIBAQ', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
|
||||
return_value, out = subprocess.getstatusoutput(f"meshtastic --host localhost --seturl {url}")
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search('CgUYAyIBAQ', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_seturl_invalid_url():
|
||||
"""Test --seturl with invalid url"""
|
||||
# Note: This url is no longer a valid url.
|
||||
url = "https://www.meshtastic.org/c/#GAMiENTxuzogKQdZ8Lz_q89Oab8qB0RlZmF1bHQ="
|
||||
return_value, out = subprocess.getstatusoutput(f"meshtastic --host localhost --seturl {url}")
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search('Warning: There were no settings', out, re.MULTILINE)
|
||||
assert return_value == 1
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_configure():
|
||||
"""Test --configure"""
|
||||
_ , out = subprocess.getstatusoutput(f"meshtastic --host localhost --configure example_config.yaml")
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search('^Setting device owner to Bob TBeam', out, re.MULTILINE)
|
||||
assert re.search('^Fixing altitude at 304 meters', out, re.MULTILINE)
|
||||
assert re.search('^Fixing latitude at 35.8', out, re.MULTILINE)
|
||||
assert re.search('^Fixing longitude at -93.8', out, re.MULTILINE)
|
||||
assert re.search('^Setting device position', out, re.MULTILINE)
|
||||
assert re.search('^Set region to 1', out, re.MULTILINE)
|
||||
assert re.search('^Set is_always_powered to true', out, re.MULTILINE)
|
||||
assert re.search('^Set send_owner_interval to 2', out, re.MULTILINE)
|
||||
assert re.search('^Set screen_on_secs to 31536000', out, re.MULTILINE)
|
||||
assert re.search('^Set wait_bluetooth_secs to 31536000', out, re.MULTILINE)
|
||||
assert re.search('^Writing modified preferences to device', out, re.MULTILINE)
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_ham():
|
||||
"""Test --set-ham
|
||||
Note: Do a factory reset after this setting so it is very short-lived.
|
||||
"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-ham KI1234')
|
||||
assert re.search(r'Setting Ham ID', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_REBOOT)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
|
||||
assert re.search(r'Owner: KI1234', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_set_wifi_settings():
|
||||
"""Test --set wifi_ssid and --set wifi_password"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set wifi_ssid "some_ssid" --set wifi_password "temp1234"')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set wifi_ssid to some_ssid', out, re.MULTILINE)
|
||||
assert re.search(r'^Set wifi_password to temp1234', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# pause for the radio
|
||||
time.sleep(PAUSE_AFTER_COMMAND)
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get wifi_ssid --get wifi_password')
|
||||
assert re.search(r'^wifi_ssid: some_ssid', out, re.MULTILINE)
|
||||
assert re.search(r'^wifi_password: sekrit', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
|
||||
|
||||
@pytest.mark.smokevirt
|
||||
def test_smokevirt_factory_reset():
|
||||
"""Test factory reset"""
|
||||
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set factory_reset true')
|
||||
assert re.match(r'Connected to radio', out)
|
||||
assert re.search(r'^Set factory_reset to true', out, re.MULTILINE)
|
||||
assert re.search(r'^Writing modified preferences to device', out, re.MULTILINE)
|
||||
assert return_value == 0
|
||||
# NOTE: The virtual radio will not respond well after this command. Need to re-start the virtual program at this point.
|
||||
# TODO: fix?
|
||||
@@ -99,7 +99,7 @@ def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, reset_g
|
||||
assert re.search(r'in onTunnelReceive', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
@@ -117,7 +117,7 @@ def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals,
|
||||
assert not ignore
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
@@ -135,7 +135,7 @@ def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_glo
|
||||
assert ignore
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
@@ -173,7 +173,7 @@ def test_shouldFilterPacket_udp(mock_platform_system, caplog, reset_globals, ifa
|
||||
assert not ignore
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
@@ -213,7 +213,7 @@ def test_shouldFilterPacket_tcp(mock_platform_system, caplog, reset_globals, ifa
|
||||
assert not ignore
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
@@ -234,7 +234,7 @@ def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_
|
||||
assert ignore
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _ipToNodeId()"""
|
||||
@@ -250,7 +250,7 @@ def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with
|
||||
assert nodeid is None
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_ipToNodeId_all(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
"""Test _ipToNodeId()"""
|
||||
|
||||
@@ -88,7 +88,7 @@ def test_pskToString_one_byte_zero_value():
|
||||
assert pskToString(bytes([0x00])) == 'unencrypted'
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_pskToString_one_byte_non_zero_value():
|
||||
"""Test pskToString one byte that is non-zero"""
|
||||
assert pskToString(bytes([0x01])) == 'default'
|
||||
@@ -118,7 +118,7 @@ def test_our_exit_zero_return_value(capsys):
|
||||
assert pytest_wrapped_e.value.code == 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_our_exit_non_zero_return_value(capsys):
|
||||
"""Test our_exit with a non-zero return value"""
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
@@ -130,7 +130,7 @@ def test_our_exit_non_zero_return_value(capsys):
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_fixme():
|
||||
"""Test fixme()"""
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
@@ -166,7 +166,7 @@ def test_remove_keys_from_dict_empty_keys_empty_dict():
|
||||
assert not remove_keys_from_dict((), {})
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_remove_keys_from_dict_empty_dict():
|
||||
"""Test when dict is empty"""
|
||||
assert not remove_keys_from_dict(('a'), {})
|
||||
@@ -178,13 +178,13 @@ def test_remove_keys_from_dict_empty_keys():
|
||||
assert remove_keys_from_dict((), {'a':1}) == {'a':1}
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_remove_keys_from_dict():
|
||||
"""Test remove_keys_from_dict()"""
|
||||
assert remove_keys_from_dict(('b'), {'a':1, 'b':2}) == {'a':1}
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_remove_keys_from_dict_multiple_keys():
|
||||
"""Test remove_keys_from_dict()"""
|
||||
keys = ('a', 'b')
|
||||
@@ -224,7 +224,7 @@ def test_hexstr():
|
||||
assert hexstr(b'') == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_ipstr():
|
||||
"""Test ipstr()"""
|
||||
assert ipstr(b'1234') == '49.50.51.52'
|
||||
@@ -237,14 +237,14 @@ def test_readnet_u16():
|
||||
assert readnet_u16(b'123456', 2) == 13108
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
@patch('serial.tools.list_ports.comports', return_value=[])
|
||||
def test_findPorts_when_none_found(patch_comports):
|
||||
"""Test findPorts()"""
|
||||
assert not findPorts()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.unitslow
|
||||
def test_convert_mac_addr():
|
||||
"""Test convert_mac_addr()"""
|
||||
assert convert_mac_addr('/c0gFyhb') == 'fd:cd:20:17:28:5b'
|
||||
|
||||
@@ -195,7 +195,8 @@ def support_info():
|
||||
print(' Machine: {0}'.format(platform.uname().machine))
|
||||
print(' Encoding (stdin): {0}'.format(sys.stdin.encoding))
|
||||
print(' Encoding (stdout): {0}'.format(sys.stdout.encoding))
|
||||
print(' meshtastic: v{0}'.format(pkg_resources.require('meshtastic')[0].version))
|
||||
the_version = pkg_resources.get_distribution("meshtastic").version
|
||||
print(' meshtastic: v{0}'.format(the_version))
|
||||
print(' Executable: {0}'.format(sys.argv[0]))
|
||||
print(' Python: {0} {1} {2}'.format(platform.python_version(),
|
||||
platform.python_implementation(), platform.python_compiler()))
|
||||
|
||||
2
proto
2
proto
Submodule proto updated: 7b80bde421...18fc4cdb52
@@ -1,11 +1,12 @@
|
||||
[pytest]
|
||||
|
||||
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples"
|
||||
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples and not smokevirt"
|
||||
|
||||
markers =
|
||||
unit: marks tests as unit tests
|
||||
unitslow: marks slow unit tests
|
||||
int: marks tests as integration tests
|
||||
smokevirt: marks tests as smoke tests against virtual device
|
||||
smoke1: runs smoke tests on a single device connected via USB
|
||||
smoke2: runs smoke tests on a two devices connected via USB
|
||||
smokewifi: runs smoke test on an esp32 device setup with wifi
|
||||
|
||||
3
setup.py
3
setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
|
||||
# This call to setup() does all the work
|
||||
setup(
|
||||
name="meshtastic",
|
||||
version="1.2.52",
|
||||
version="1.2.53",
|
||||
description="Python API & client shell for talking to Meshtastic devices",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
@@ -27,6 +27,7 @@ setup(
|
||||
"Programming Language :: Python :: 3.7",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
"Programming Language :: Python :: 3.9",
|
||||
"Programming Language :: Python :: 3.10",
|
||||
],
|
||||
packages=["meshtastic"],
|
||||
include_package_data=True,
|
||||
|
||||
Reference in New Issue
Block a user