Compare commits

...

52 Commits

Author SHA1 Message Date
mkinney
b08e96b66a Update setup.py
bump version for release
2022-01-11 11:02:26 -08:00
mkinney
9e74ead54e Merge pull request #219 from mkinney/smoke_virt
add smoke test for virtual device
2022-01-11 10:56:33 -08:00
Mike Kinney
2cf52f3df6 add a convenience target in Makefile for running the smokevirt test 2022-01-11 18:50:18 +00:00
Mike Kinney
50e9a0a9f7 exclude smokevirt from ci 2022-01-11 18:44:39 +00:00
Mike Kinney
841c44e05c add smoke test for virtual device 2022-01-11 18:39:49 +00:00
mkinney
465bafeb30 Merge pull request #218 from mkinney/fix_remote_admin_message
refactor code to only call local node when necessary; fix tests
2022-01-10 16:51:04 -08:00
Mike Kinney
cb8dafbd31 add more coverage 2022-01-10 16:40:47 -08:00
Mike Kinney
52db617b06 refactor code to only call local node when necessary; fix tests 2022-01-10 16:20:11 -08:00
Jm Casler
ec622590da updating proto submodule to latest 2022-01-09 22:23:07 -08:00
mkinney
345cb1bdc4 Merge pull request #217 from mkinney/fix_remote_admin_message
Fix remote admin message
2022-01-06 12:36:03 -08:00
Mike Kinney
8ca692a26e cannot call os.getlogin() on github instances 2022-01-06 12:33:30 -08:00
Mike Kinney
d1ea68d7dc add code coverage to recently added code 2022-01-06 12:17:42 -08:00
Mike Kinney
b56440a4e8 should not need to talk with remote node if just doing sendtext 2022-01-06 11:55:36 -08:00
github-actions
1401b949a3 Update protobuf submodule 2022-01-06 17:09:45 +00:00
mkinney
97f3ce6198 Merge pull request #215 from mkinney/combine_build_single_executables
combine the building of single executables into one action
2022-01-06 00:25:46 -08:00
Mike Kinney
8019391914 combine the building of single executables into one action 2022-01-06 00:24:59 -08:00
mkinney
8d10010ab1 Merge pull request #214 from mkinney/more_testing_stuff
move slower unit tests to unitslow
2022-01-06 00:00:42 -08:00
mkinney
a2b4d2a96a Update build_mac.yml 2022-01-05 23:47:02 -08:00
mkinney
4de558bdf6 Update build_mac.yml 2022-01-05 23:40:49 -08:00
mkinney
8ff06a0de1 Update build_mac.yml 2022-01-05 23:37:35 -08:00
mkinney
f8ad6061c1 Update build_mac.yml
revert
2022-01-05 23:06:49 -08:00
mkinney
aa4cdb6aea Update build_mac.yml 2022-01-05 23:01:13 -08:00
Mike Kinney
cba424fde0 move slower unit tests to unitslow 2022-01-05 21:12:50 -08:00
mkinney
6c7a870645 Merge pull request #213 from mkinney/add_python_310
check python v3.10 as well
2022-01-05 18:20:55 -08:00
Mike Kinney
f42b1ad4e0 check python v3.10 as well 2022-01-05 18:18:31 -08:00
mkinney
30a51952e0 Merge pull request #212 from mkinney/serial_perms
improve the permission error on linux
2022-01-05 15:08:24 -08:00
Mike Kinney
5de754c5ab improve the permission error on linux 2022-01-05 23:03:06 +00:00
mkinney
bda446c7b5 Update build_mac.yml 2022-01-05 14:13:42 -08:00
mkinney
f79540f197 Update build_mac.yml 2022-01-05 14:09:17 -08:00
mkinney
d507697e56 Merge pull request #210 from mkinney/work_on_single_executable
install meshtastic before building single executable
2022-01-05 13:58:47 -08:00
Mike Kinney
acbc1f2e30 install meshtastic before building single executable 2022-01-05 13:56:07 -08:00
mkinney
7b3c68119c Merge pull request #209 from mkinney/add_modules_for_building_single_executables
collect the meshtastic libs
2022-01-05 13:36:25 -08:00
Mike Kinney
39f97166b0 collect the meshtastic libs 2022-01-05 13:34:28 -08:00
mkinney
541d19cafb Merge pull request #208 from mkinney/create_actions_for_building_single_executables
Create actions for building single executables
2022-01-05 13:26:47 -08:00
Mike Kinney
969a81b779 Merge remote-tracking branch 'upstream/master' into create_actions_for_building_single_executables 2022-01-05 13:24:33 -08:00
Mike Kinney
3f76c1efb0 refactor version info so pyinstaller will work; add build mac and ubuntu standalone executables 2022-01-05 13:22:37 -08:00
mkinney
774849189f Merge pull request #207 from mkinney/create_build_windows_action
add a build windows action
2022-01-05 12:46:10 -08:00
Mike Kinney
53d626aa72 add a build windows action 2022-01-05 12:44:05 -08:00
mkinney
1a3a840269 Merge pull request #206 from mkinney/fully_qualify_imports
need to fully qualify imports so projects consuming the library will …
2022-01-05 11:24:13 -08:00
Mike Kinney
fe69f05e75 add python 3.6, 3.7, 3.8, and 3.9 for ci and validation 2022-01-05 11:20:04 -08:00
Mike Kinney
5c662822b9 need to fully qualify imports so projects consuming the library will work 2022-01-05 11:16:08 -08:00
mkinney
c049d3424a Merge pull request #205 from mkinney/remove_nested_keys
remove nested keys from nodes so we do not display garbage
2022-01-02 11:28:07 -08:00
Mike Kinney
471535853b bump version 2022-01-02 11:20:15 -08:00
Mike Kinney
676148cc14 meant to use decoded not decode 2022-01-02 11:19:17 -08:00
Mike Kinney
a915b05240 remove nested keys from nodes so we do not display garbage 2022-01-02 11:15:19 -08:00
Jm Casler
a1668e8c66 updating proto submodule to latest 2022-01-01 23:25:22 -08:00
mkinney
e7664cb40b Merge pull request #204 from mkinney/add_more_unit_tests
get last two lines covered in node
2022-01-01 15:51:21 -08:00
Mike Kinney
83c18f4008 working on more unit tests 2022-01-01 15:48:33 -08:00
Mike Kinney
8b6321ce7f add a few more tests 2022-01-01 15:21:53 -08:00
Mike Kinney
9fac981ba6 test heartbeatTimer 2022-01-01 14:53:57 -08:00
Mike Kinney
ccc71930f7 get last two lines covered in node 2022-01-01 13:25:36 -08:00
mkinney
9380f048fa Update setup.py
bump version
2022-01-01 11:11:54 -08:00
27 changed files with 1192 additions and 229 deletions

90
.github/workflows/build_executables.yml vendored Normal file
View File

@@ -0,0 +1,90 @@
name: Build and publish standalone executables
on: workflow_dispatch
jobs:
build-and-publish-mac:
runs-on: macos-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Setup code signing
env:
MACOS_CERTIFICATE: ${{ secrets.MACOS_CERTIFICATE }}
MACOS_CERTIFICATE_PWD: ${{ secrets.MACOS_CERTIFICATE_PWD }}
MACOS_KEYCHAIN_PASSWORD: ${{ secrets.MACOS_KEYCHAIN_PASSWORD }}
run: |
echo $MACOS_CERTIFICATE | base64 --decode > certificate.p12
security create-keychain -p "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
security default-keychain -s meshtastic.keychain
security unlock-keychain -p "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
security import certificate.p12 -k meshtastic.keychain -P "$MACOS_CERTIFICATE_PWD" -T /usr/bin/codesign
security set-key-partition-list -S apple-tool:,apple:,codesign: -s -k "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
- name: Build
env:
MACOS_SIGNING_IDENTITY: ${{ secrets.MACOS_SIGNING_IDENTITY }}
run: |
pip install pyinstaller
pip install -r requirements.txt
pip install .
pyinstaller -F -n meshtastic --collect-all meshtastic --codesign-identity "$MACOS_SIGNING_IDENTITY" meshtastic/__main__.py
- uses: actions/upload-artifact@v2
with:
name: meshtastic_mac
path: dist
build-and-publish-ubuntu:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Build
run: |
pip install pyinstaller
pip install -r requirements.txt
pip install .
pyinstaller -F -n meshtastic --collect-all meshtastic meshtastic/__main__.py
- uses: actions/upload-artifact@v2
with:
name: meshtastic_ubuntu
path: dist
build-and-publish-windows:
runs-on: windows-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Build
run: |
pip install pyinstaller
pip install -r requirements.txt
pip install .
pyinstaller -F -n meshtastic --collect-all meshtastic meshtastic/__main__.py
- uses: actions/upload-artifact@v2
with:
name: meshtastic_windows
path: dist

View File

@@ -10,12 +10,18 @@ on:
jobs: jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Install Python 3 - name: Install Python 3
uses: actions/setup-python@v1 uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Uninstall meshtastic - name: Uninstall meshtastic
run: | run: |
pip3 uninstall meshtastic pip3 uninstall meshtastic
@@ -46,12 +52,18 @@ jobs:
fail_ci_if_error: true fail_ci_if_error: true
validate: validate:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2
- name: Install Python 3 - name: Install Python 3
uses: actions/setup-python@v1 uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install meshtastic from local - name: Install meshtastic from local
run: | run: |
pip3 install . pip3 install .

1
.gitignore vendored
View File

@@ -14,3 +14,4 @@ venv/
.DS_Store .DS_Store
__pycache__ __pycache__
examples/__pycache__ examples/__pycache__
meshtastic.spec

View File

@@ -2,6 +2,10 @@
test: test:
pytest -m unit pytest -m unit
# only run the smoke tests against the virtual device
virt:
pytest -m smokevirt
# local install # local install
install: install:
pip install . pip install .
@@ -16,7 +20,7 @@ lint:
# show the slowest unit tests # show the slowest unit tests
slow: slow:
pytest --durations=5 pytest -m unit --durations=5
# run the coverage report and open results in a browser # run the coverage report and open results in a browser
cov: cov:

View File

@@ -77,9 +77,11 @@ from pubsub import pub
from dotmap import DotMap from dotmap import DotMap
from tabulate import tabulate from tabulate import tabulate
from google.protobuf.json_format import MessageToJson from google.protobuf.json_format import MessageToJson
from .util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout from meshtastic.util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout
from .node import Node from meshtastic.node import Node
from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util from meshtastic import (mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2,
environmental_measurement_pb2, remote_hardware_pb2,
channel_pb2, radioconfig_pb2, util)
# Note: To follow PEP224, comments should be after the module variable. # Note: To follow PEP224, comments should be after the module variable.

View File

@@ -5,6 +5,7 @@
import argparse import argparse
import platform import platform
import logging import logging
import os
import sys import sys
import time import time
import yaml import yaml
@@ -13,10 +14,11 @@ import pyqrcode
import pkg_resources import pkg_resources
import meshtastic.util import meshtastic.util
import meshtastic.test import meshtastic.test
from . import remote_hardware from meshtastic import remote_hardware
from .ble_interface import BLEInterface from meshtastic.ble_interface import BLEInterface
from . import portnums_pb2, channel_pb2, radioconfig_pb2 from meshtastic import portnums_pb2, channel_pb2, radioconfig_pb2
from .globals import Globals from meshtastic.globals import Globals
from meshtastic.__init__ import BROADCAST_ADDR
def onReceive(packet, interface): def onReceive(packet, interface):
@@ -139,14 +141,6 @@ def onConnected(interface):
if not args.export_config: if not args.export_config:
print("Connected to radio") print("Connected to radio")
def getNode():
"""This operation could be expensive, so we try to cache the results"""
targetNode = our_globals.get_target_node()
if not targetNode:
targetNode = interface.getNode(args.destOrLocal)
our_globals.set_target_node(targetNode)
return targetNode
if args.setlat or args.setlon or args.setalt: if args.setlat or args.setlon or args.setalt:
closeNow = True closeNow = True
@@ -178,12 +172,12 @@ def onConnected(interface):
if args.set_owner: if args.set_owner:
closeNow = True closeNow = True
print(f"Setting device owner to {args.set_owner}") print(f"Setting device owner to {args.set_owner}")
getNode().setOwner(args.set_owner) interface.getNode(args.dest).setOwner(args.set_owner)
if args.pos_fields: if args.pos_fields:
# If --pos-fields invoked with args, set position fields # If --pos-fields invoked with args, set position fields
closeNow = True closeNow = True
prefs = getNode().radioConfig.preferences prefs = interface.getNode(args.dest).radioConfig.preferences
allFields = 0 allFields = 0
try: try:
@@ -200,12 +194,12 @@ def onConnected(interface):
print(f"Setting position fields to {allFields}") print(f"Setting position fields to {allFields}")
setPref(prefs, 'position_flags', ('%d' % allFields)) setPref(prefs, 'position_flags', ('%d' % allFields))
print("Writing modified preferences to device") print("Writing modified preferences to device")
getNode().writeConfig() interface.getNode(args.dest).writeConfig()
elif args.pos_fields is not None: elif args.pos_fields is not None:
# If --pos-fields invoked without args, read and display current value # If --pos-fields invoked without args, read and display current value
closeNow = True closeNow = True
prefs = getNode().radioConfig.preferences prefs = interface.getNode(args.dest).radioConfig.preferences
fieldNames = [] fieldNames = []
for bit in radioconfig_pb2.PositionFlags.values(): for bit in radioconfig_pb2.PositionFlags.values():
@@ -224,81 +218,85 @@ def onConnected(interface):
print(sorted(meshtastic.mesh_pb2.Team.keys())) print(sorted(meshtastic.mesh_pb2.Team.keys()))
else: else:
print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}") print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}")
getNode().setOwner(team=v_team) interface.getNode(args.dest).setOwner(team=v_team)
if args.set_ham: if args.set_ham:
closeNow = True closeNow = True
print(f"Setting Ham ID to {args.set_ham} and turning off encryption") print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
getNode().setOwner(args.set_ham, is_licensed=True) interface.getNode(args.dest).setOwner(args.set_ham, is_licensed=True)
# Must turn off encryption on primary channel # Must turn off encryption on primary channel
getNode().turnOffEncryptionOnPrimaryChannel() interface.getNode(args.dest).turnOffEncryptionOnPrimaryChannel()
if args.reboot: if args.reboot:
closeNow = True closeNow = True
getNode().reboot() interface.getNode(args.dest).reboot()
if args.sendtext: if args.sendtext:
closeNow = True closeNow = True
channelIndex = 0 channelIndex = 0
if args.ch_index is not None: if args.ch_index is not None:
channelIndex = int(args.ch_index) channelIndex = int(args.ch_index)
ch = getNode().getChannelByChannelIndex(channelIndex) ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logging.debug(f'ch:{ch}')
if ch and ch.role != channel_pb2.Channel.Role.DISABLED: if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}") print(f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}")
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex) interface.sendText(args.sendtext, args.dest, wantAck=True, channelIndex=channelIndex)
else: else:
meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.") meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.")
if args.sendping: if args.sendping:
payload = str.encode("test string") payload = str.encode("test string")
print(f"Sending ping message to {args.destOrAll}") print(f"Sending ping message to {args.dest}")
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP, interface.sendData(payload, args.dest, portNum=portnums_pb2.PortNum.REPLY_APP,
wantAck=True, wantResponse=True) wantAck=True, wantResponse=True)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch: if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
rhc = remote_hardware.RemoteHardwareClient(interface) if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb: if args.gpio_wrb:
bitmask = 0 bitmask = 0
bitval = 0 bitval = 0
for wrpair in (args.gpio_wrb or []): for wrpair in (args.gpio_wrb or []):
bitmask |= 1 << int(wrpair[0]) bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0]) bitval |= int(wrpair[1]) << int(wrpair[0])
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}") print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
rhc.writeGPIOs(args.dest, bitmask, bitval) rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True closeNow = True
if args.gpio_rd: if args.gpio_rd:
bitmask = int(args.gpio_rd, 16) bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}") print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
interface.mask = bitmask interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None) rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto: if not interface.noProto:
# wait up to X seconds for a response # wait up to X seconds for a response
for _ in range(10): for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f'end of gpio_rd')
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1) time.sleep(1)
if interface.gotResponse:
break
logging.debug(f'end of gpio_rd')
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
# handle settings # handle settings
if args.set: if args.set:
closeNow = True closeNow = True
prefs = getNode().radioConfig.preferences prefs = interface.getNode(args.dest).radioConfig.preferences
# Handle the int/float/bool arguments # Handle the int/float/bool arguments
for pref in args.set: for pref in args.set:
setPref(prefs, pref[0], pref[1]) setPref(prefs, pref[0], pref[1])
print("Writing modified preferences to device") print("Writing modified preferences to device")
getNode().writeConfig() interface.getNode(args.dest).writeConfig()
if args.configure: if args.configure:
with open(args.configure[0], encoding='utf8') as file: with open(args.configure[0], encoding='utf8') as file:
@@ -307,11 +305,11 @@ def onConnected(interface):
if 'owner' in configuration: if 'owner' in configuration:
print(f"Setting device owner to {configuration['owner']}") print(f"Setting device owner to {configuration['owner']}")
getNode().setOwner(configuration['owner']) interface.getNode(args.dest).setOwner(configuration['owner'])
if 'channel_url' in configuration: if 'channel_url' in configuration:
print("Setting channel url to", configuration['channel_url']) print("Setting channel url to", configuration['channel_url'])
getNode().setURL(configuration['channel_url']) interface.getNode(args.dest).setURL(configuration['channel_url'])
if 'location' in configuration: if 'location' in configuration:
alt = 0 alt = 0
@@ -336,11 +334,11 @@ def onConnected(interface):
interface.localNode.writeConfig() interface.localNode.writeConfig()
if 'user_prefs' in configuration: if 'user_prefs' in configuration:
prefs = getNode().radioConfig.preferences prefs = interface.getNode(args.dest).radioConfig.preferences
for pref in configuration['user_prefs']: for pref in configuration['user_prefs']:
setPref(prefs, pref, str(configuration['user_prefs'][pref])) setPref(prefs, pref, str(configuration['user_prefs'][pref]))
print("Writing modified preferences to device") print("Writing modified preferences to device")
getNode().writeConfig() interface.getNode(args.dest).writeConfig()
if args.export_config: if args.export_config:
# export the configuration (the opposite of '--configure') # export the configuration (the opposite of '--configure')
@@ -349,7 +347,7 @@ def onConnected(interface):
if args.seturl: if args.seturl:
closeNow = True closeNow = True
getNode().setURL(args.seturl) interface.getNode(args.dest).setURL(args.seturl)
# handle changing channels # handle changing channels
@@ -357,7 +355,7 @@ def onConnected(interface):
closeNow = True closeNow = True
if len(args.ch_add) > 10: if len(args.ch_add) > 10:
meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.") meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.")
n = getNode() n = interface.getNode(args.dest)
ch = n.getChannelByName(args.ch_add) ch = n.getChannelByName(args.ch_add)
if ch: if ch:
meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.") meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.")
@@ -385,7 +383,7 @@ def onConnected(interface):
meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1) meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1)
else: else:
print(f"Deleting channel {channelIndex}") print(f"Deleting channel {channelIndex}")
ch = getNode().deleteChannel(channelIndex) ch = interface.getNode(args.dest).deleteChannel(channelIndex)
ch_changes = [args.ch_longslow, args.ch_longfast, ch_changes = [args.ch_longslow, args.ch_longfast,
args.ch_mediumslow, args.ch_mediumfast, args.ch_mediumslow, args.ch_mediumfast,
@@ -401,7 +399,7 @@ def onConnected(interface):
channelIndex = 0 channelIndex = 0
else: else:
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1) meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
ch = getNode().channels[channelIndex] ch = interface.getNode(args.dest).channels[channelIndex]
if any_primary_channel_changes or args.ch_enable or args.ch_disable: if any_primary_channel_changes or args.ch_enable or args.ch_disable:
@@ -462,21 +460,22 @@ def onConnected(interface):
ch.role = channel_pb2.Channel.Role.DISABLED ch.role = channel_pb2.Channel.Role.DISABLED
print(f"Writing modified channels to device") print(f"Writing modified channels to device")
getNode().writeChannel(channelIndex) interface.getNode(args.dest).writeChannel(channelIndex)
if args.info: if args.info:
print("") print("")
if not args.dest: # If we aren't trying to talk to our local node, don't show it # If we aren't trying to talk to our local node, don't show it
if args.dest == BROADCAST_ADDR:
interface.showInfo() interface.showInfo()
print("") print("")
getNode().showInfo() interface.getNode(args.dest).showInfo()
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
print("") print("")
if args.get: if args.get:
closeNow = True closeNow = True
prefs = getNode().radioConfig.preferences prefs = interface.getNode(args.dest).radioConfig.preferences
# Handle the int/float/bool arguments # Handle the int/float/bool arguments
for pref in args.get: for pref in args.get:
@@ -590,13 +589,8 @@ def common():
channelIndex = int(args.ch_index) channelIndex = int(args.ch_index)
our_globals.set_channel_index(channelIndex) our_globals.set_channel_index(channelIndex)
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
if not args.dest: if not args.dest:
args.destOrAll = "^all" args.dest = BROADCAST_ADDR
args.destOrLocal = "^local"
else:
args.destOrAll = args.dest
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
if not args.seriallog: if not args.seriallog:
if args.noproto: if args.noproto:
@@ -635,12 +629,19 @@ def common():
elif args.host: elif args.host:
client = meshtastic.tcp_interface.TCPInterface(args.host, debugOut=logfile, noProto=args.noproto) client = meshtastic.tcp_interface.TCPInterface(args.host, debugOut=logfile, noProto=args.noproto)
else: else:
client = meshtastic.serial_interface.SerialInterface(args.port, debugOut=logfile, noProto=args.noproto) try:
client = meshtastic.serial_interface.SerialInterface(args.port, debugOut=logfile, noProto=args.noproto)
except PermissionError as ex:
username = os.getlogin()
message = "Permission Error:\n"
message += " Need to add yourself to the 'dialout' group by running:\n"
message += f" sudo usermod -a -G dialout {username}\n"
message += " After running that command, log out and re-login for it to take effect.\n"
message += f"Error was:{ex}"
meshtastic.util.our_exit(message)
# We assume client is fully connected now # We assume client is fully connected now
onConnected(client) onConnected(client)
#if logfile:
#logfile.close()
have_tunnel = platform.system() == 'Linux' have_tunnel = platform.system() == 'Linux'
if args.noproto or args.reply or (have_tunnel and args.tunnel): # loop until someone presses ctrlc if args.noproto or args.reply or (have_tunnel and args.tunnel): # loop until someone presses ctrlc
@@ -818,8 +819,8 @@ def initParser():
parser.set_defaults(deprecated=None) parser.set_defaults(deprecated=None)
parser.add_argument('--version', action='version', the_version = pkg_resources.get_distribution("meshtastic").version
version=f"{pkg_resources.require('meshtastic')[0].version}") parser.add_argument('--version', action='version', version=f"{the_version}")
parser.add_argument( parser.add_argument(
"--support", action='store_true', help="Show support info (useful when troubleshooting an issue)") "--support", action='store_true', help="Show support info (useful when troubleshooting an issue)")

View File

@@ -4,7 +4,7 @@ import logging
import pygatt import pygatt
from .mesh_interface import MeshInterface from meshtastic.mesh_interface import MeshInterface
# Our standard BLE characteristics # Our standard BLE characteristics
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7" TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"

View File

@@ -27,7 +27,6 @@ class Globals:
Globals.__instance = self Globals.__instance = self
self.args = None self.args = None
self.parser = None self.parser = None
self.target_node = None
self.channel_index = None self.channel_index = None
self.logfile = None self.logfile = None
self.tunnelInstance = None self.tunnelInstance = None
@@ -36,7 +35,6 @@ class Globals:
"""Reset all of our globals. If you add a member, add it to this method, too.""" """Reset all of our globals. If you add a member, add it to this method, too."""
self.args = None self.args = None
self.parser = None self.parser = None
self.target_node = None
self.channel_index = None self.channel_index = None
self.logfile = None self.logfile = None
self.tunnelInstance = None self.tunnelInstance = None
@@ -50,10 +48,6 @@ class Globals:
"""Set the parser""" """Set the parser"""
self.parser = parser self.parser = parser
def set_target_node(self, target_node):
"""Set the target_node"""
self.target_node = target_node
def set_channel_index(self, channel_index): def set_channel_index(self, channel_index):
"""Set the channel_index""" """Set the channel_index"""
self.channel_index = channel_index self.channel_index = channel_index
@@ -75,10 +69,6 @@ class Globals:
"""Get parser""" """Get parser"""
return self.parser return self.parser
def get_target_node(self):
"""Get target_node"""
return self.target_node
def get_channel_index(self): def get_channel_index(self):
"""Get channel_index""" """Get channel_index"""
return self.channel_index return self.channel_index

View File

@@ -17,9 +17,9 @@ from google.protobuf.json_format import MessageToJson
import meshtastic.node import meshtastic.node
from . import portnums_pb2, mesh_pb2 from meshtastic import portnums_pb2, mesh_pb2
from .util import stripnl, Timeout, our_exit, remove_keys_from_dict, convert_mac_addr from meshtastic.util import stripnl, Timeout, our_exit, remove_keys_from_dict, convert_mac_addr
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols from meshtastic.__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
class MeshInterface: class MeshInterface:
"""Interface class for meshtastic devices """Interface class for meshtastic devices
@@ -68,8 +68,7 @@ class MeshInterface:
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None: if exc_type is not None and exc_value is not None:
logging.error( logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred')
f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None: if traceback is not None:
logging.error(f'Traceback: {traceback}') logging.error(f'Traceback: {traceback}')
self.close() self.close()
@@ -84,10 +83,10 @@ class MeshInterface:
nodes = "" nodes = ""
if self.nodes: if self.nodes:
for n in self.nodes.values(): for n in self.nodes.values():
# when the TBeam is first booted, it sometimes shows the 'raw' data # when the TBeam is first booted, it sometimes shows the raw data
# so, we will just remove any raw keys # so, we will just remove any raw keys
n2 = remove_keys_from_dict('raw', n) keys_to_remove = ('raw', 'decoded', 'payload')
n2 = remove_keys_from_dict('decode', n2) n2 = remove_keys_from_dict(keys_to_remove, n)
# if we have 'macaddr', re-format it # if we have 'macaddr', re-format it
if 'macaddr' in n2['user']: if 'macaddr' in n2['user']:
@@ -160,7 +159,7 @@ class MeshInterface:
def getNode(self, nodeId): def getNode(self, nodeId):
"""Return a node object which contains device settings and channel info""" """Return a node object which contains device settings and channel info"""
if nodeId == LOCAL_ADDR: if nodeId in (LOCAL_ADDR, BROADCAST_ADDR):
return self.localNode return self.localNode
else: else:
n = meshtastic.node.Node(self, nodeId) n = meshtastic.node.Node(self, nodeId)
@@ -394,11 +393,11 @@ class MeshInterface:
return user.get('shortName', None) return user.get('shortName', None)
return None return None
def _waitConnected(self): def _waitConnected(self, timeout=15.0):
"""Block until the initial node db download is complete, or timeout """Block until the initial node db download is complete, or timeout
and raise an exception""" and raise an exception"""
if not self.noProto: if not self.noProto:
if not self.isConnected.wait(15.0): # timeout after x seconds if not self.isConnected.wait(timeout): # timeout after x seconds
raise Exception("Timed out waiting for connection completion") raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client # If we failed while connecting, raise the connection to the client
@@ -416,8 +415,7 @@ class MeshInterface:
def _disconnected(self): def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected""" """Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear() self.isConnected.clear()
publishingThread.queueWork(lambda: pub.sendMessage( publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.lost", interface=self))
"meshtastic.connection.lost", interface=self))
def _startHeartbeat(self): def _startHeartbeat(self):
"""We need to send a heartbeat message to the device every X seconds""" """We need to send a heartbeat message to the device every X seconds"""
@@ -443,8 +441,7 @@ class MeshInterface:
if not self.isConnected.is_set(): if not self.isConnected.is_set():
self.isConnected.set() self.isConnected.set()
self._startHeartbeat() self._startHeartbeat()
publishingThread.queueWork(lambda: pub.sendMessage( publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.established", interface=self))
"meshtastic.connection.established", interface=self))
def _startConfig(self): def _startConfig(self):
"""Start device packets flowing""" """Start device packets flowing"""

View File

@@ -4,8 +4,8 @@
import logging import logging
import base64 import base64
from google.protobuf.json_format import MessageToJson from google.protobuf.json_format import MessageToJson
from . import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2 from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from .util import pskToString, stripnl, Timeout, our_exit, fromPSK from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
@@ -257,6 +257,7 @@ class Node:
p = admin_pb2.AdminMessage() p = admin_pb2.AdminMessage()
p.get_radio_request = True p.get_radio_request = True
# TODO: should we check that localNode has an 'admin' channel?
# Show progress message for super slow operations # Show progress message for super slow operations
if self != self.iface.localNode: if self != self.iface.localNode:
print("Requesting preferences from remote node.") print("Requesting preferences from remote node.")

View File

@@ -2,8 +2,8 @@
""" """
import logging import logging
from pubsub import pub from pubsub import pub
from . import portnums_pb2, remote_hardware_pb2 from meshtastic import portnums_pb2, remote_hardware_pb2
from .util import our_exit from meshtastic.util import our_exit
def onGPIOreceive(packet, interface): def onGPIOreceive(packet, interface):

View File

@@ -6,7 +6,7 @@ import platform
import serial import serial
import meshtastic.util import meshtastic.util
from .stream_interface import StreamInterface from meshtastic.stream_interface import StreamInterface
if platform.system() != 'Windows': if platform.system() != 'Windows':
import termios import termios
@@ -40,28 +40,25 @@ class SerialInterface(StreamInterface):
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR # first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
# see https://github.com/pyserial/pyserial/issues/124 # see https://github.com/pyserial/pyserial/issues/124
if not self.noProto: if platform.system() != 'Windows':
if platform.system() != 'Windows': with open(devPath, encoding='utf8') as f:
with open(devPath, encoding='utf8') as f: attrs = termios.tcgetattr(f)
attrs = termios.tcgetattr(f) attrs[2] = attrs[2] & ~termios.HUPCL
attrs[2] = attrs[2] & ~termios.HUPCL termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
termios.tcsetattr(f, termios.TCSAFLUSH, attrs) f.close()
f.close() time.sleep(0.1)
time.sleep(0.1)
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0) self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
if not self.noProto: self.stream.flush()
self.stream.flush() time.sleep(0.1)
time.sleep(0.1)
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow) StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self): def close(self):
"""Close a connection to the device""" """Close a connection to the device"""
if not self.noProto: self.stream.flush()
self.stream.flush() time.sleep(0.1)
time.sleep(0.1) self.stream.flush()
self.stream.flush() time.sleep(0.1)
time.sleep(0.1)
logging.debug("Closing Serial stream") logging.debug("Closing Serial stream")
StreamInterface.close(self) StreamInterface.close(self)

View File

@@ -7,8 +7,8 @@ import traceback
import serial import serial
from .mesh_interface import MeshInterface from meshtastic.mesh_interface import MeshInterface
from .util import stripnl from meshtastic.util import stripnl
START1 = 0x94 START1 = 0x94

View File

@@ -4,7 +4,7 @@ import logging
import socket import socket
from typing import AnyStr from typing import AnyStr
from .stream_interface import StreamInterface from meshtastic.stream_interface import StreamInterface
class TCPInterface(StreamInterface): class TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link""" """Interface class for meshtastic devices over a TCP link"""

View File

@@ -8,9 +8,9 @@ import traceback
from dotmap import DotMap from dotmap import DotMap
from pubsub import pub from pubsub import pub
import meshtastic.util import meshtastic.util
from .__init__ import BROADCAST_NUM from meshtastic.__init__ import BROADCAST_NUM
from .serial_interface import SerialInterface from meshtastic.serial_interface import SerialInterface
from .tcp_interface import TCPInterface from meshtastic.tcp_interface import TCPInterface
"""The interfaces we are using for our tests""" """The interfaces we are using for our tests"""

View File

@@ -119,7 +119,6 @@ def test_main_test_no_ports(patched_find_ports, reset_globals, capsys):
sys.argv = ['', '--test'] sys.argv = ['', '--test']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
assert Globals.getInstance().get_target_node() is None
with pytest.raises(SystemExit) as pytest_wrapped_e: with pytest.raises(SystemExit) as pytest_wrapped_e:
main() main()
assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.type == SystemExit
@@ -137,7 +136,6 @@ def test_main_test_one_port(patched_find_ports, reset_globals, capsys):
sys.argv = ['', '--test'] sys.argv = ['', '--test']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
assert Globals.getInstance().get_target_node() is None
with pytest.raises(SystemExit) as pytest_wrapped_e: with pytest.raises(SystemExit) as pytest_wrapped_e:
main() main()
assert pytest_wrapped_e.type == SystemExit assert pytest_wrapped_e.type == SystemExit
@@ -183,7 +181,7 @@ def test_main_test_two_ports_fails(patched_test_all, reset_globals, capsys):
@pytest.mark.unit @pytest.mark.unit
def test_main_info(capsys, reset_globals): def test_main_info(capsys, caplog, reset_globals):
"""Test --info""" """Test --info"""
sys.argv = ['', '--info'] sys.argv = ['', '--info']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
@@ -192,13 +190,37 @@ def test_main_info(capsys, reset_globals):
def mock_showInfo(): def mock_showInfo():
print('inside mocked showInfo') print('inside mocked showInfo')
iface.showInfo.side_effect = mock_showInfo iface.showInfo.side_effect = mock_showInfo
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: with caplog.at_level(logging.DEBUG):
main() with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@patch('os.getlogin')
def test_main_info_with_permission_error(patched_getlogin, capsys, caplog, reset_globals):
"""Test --info"""
sys.argv = ['', '--info']
Globals.getInstance().set_args(sys.argv)
patched_getlogin.return_value = 'me'
iface = MagicMock(autospec=SerialInterface)
with caplog.at_level(logging.DEBUG):
with pytest.raises(SystemExit) as pytest_wrapped_e:
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.side_effect = PermissionError('bla bla')
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr() out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE) patched_getlogin.assert_called()
assert re.search(r'inside mocked showInfo', out, re.MULTILINE) assert re.search(r'Need to add yourself', out, re.MULTILINE)
assert err == '' assert err == ''
mo.assert_called()
@pytest.mark.unit @pytest.mark.unit
@@ -456,63 +478,71 @@ def test_main_sendtext_with_channel(capsys, reset_globals):
@pytest.mark.unit @pytest.mark.unit
def test_main_sendtext_with_invalid_channel(capsys, reset_globals): def test_main_sendtext_with_invalid_channel(caplog, capsys, reset_globals):
"""Test --sendtext""" """Test --sendtext"""
sys.argv = ['', '--sendtext', 'hello', '--ch-index', '-1'] sys.argv = ['', '--sendtext', 'hello', '--ch-index', '-1']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface) iface = MagicMock(autospec=SerialInterface)
iface.getChannelByChannelIndex.return_value = None iface.localNode.getChannelByChannelIndex.return_value = None
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
iface.getNode.return_value.getChannelByChannelIndex.return_value = None with caplog.at_level(logging.DEBUG):
with pytest.raises(SystemExit) as pytest_wrapped_e: with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main() with pytest.raises(SystemExit) as pytest_wrapped_e:
assert pytest_wrapped_e.type == SystemExit main()
assert pytest_wrapped_e.value.code == 1 assert pytest_wrapped_e.type == SystemExit
out, err = capsys.readouterr() assert pytest_wrapped_e.value.code == 1
assert re.search(r'is not a valid channel', out, re.MULTILINE) out, err = capsys.readouterr()
assert err == '' assert re.search(r'is not a valid channel', out, re.MULTILINE)
mo.assert_called() assert err == ''
mo.assert_called()
@pytest.mark.unit @pytest.mark.unit
def test_main_sendtext_with_invalid_channel_nine(capsys, reset_globals): def test_main_sendtext_with_invalid_channel_nine(caplog, capsys, reset_globals):
"""Test --sendtext""" """Test --sendtext"""
sys.argv = ['', '--sendtext', 'hello', '--ch-index', '9'] sys.argv = ['', '--sendtext', 'hello', '--ch-index', '9']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface) iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: iface.localNode.getChannelByChannelIndex.return_value = None
iface.getNode.return_value.getChannelByChannelIndex.return_value = None
with pytest.raises(SystemExit) as pytest_wrapped_e: with caplog.at_level(logging.DEBUG):
main() with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
assert pytest_wrapped_e.type == SystemExit with pytest.raises(SystemExit) as pytest_wrapped_e:
assert pytest_wrapped_e.value.code == 1 main()
out, err = capsys.readouterr() assert pytest_wrapped_e.type == SystemExit
assert re.search(r'is not a valid channel', out, re.MULTILINE) assert pytest_wrapped_e.value.code == 1
assert err == '' out, err = capsys.readouterr()
mo.assert_called() assert re.search(r'is not a valid channel', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit @pytest.mark.unit
def test_main_sendtext_with_dest(capsys, reset_globals): def test_main_sendtext_with_dest(capsys, caplog, reset_globals, iface_with_nodes):
"""Test --sendtext with --dest""" """Test --sendtext with --dest"""
sys.argv = ['', '--sendtext', 'hello', '--dest', 'foo'] sys.argv = ['', '--sendtext', 'hello', '--dest', 'foo']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface) iface = iface_with_nodes
def mock_sendText(text, dest, wantAck, channelIndex): iface.myInfo.my_node_num = 2475227164
print('inside mocked sendText') mocked_channel = MagicMock(autospec=Channel)
iface.sendText.side_effect = mock_sendText iface.localNode.getChannelByChannelIndex = mocked_channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
main() with caplog.at_level(logging.DEBUG):
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE) with pytest.raises(SystemExit) as pytest_wrapped_e:
assert re.search(r'Sending text message', out, re.MULTILINE) main()
assert re.search(r'inside mocked sendText', out, re.MULTILINE) assert pytest_wrapped_e.type == SystemExit
assert err == '' assert pytest_wrapped_e.value.code == 1
mo.assert_called() out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert not re.search(r"Warning: 0 is not a valid channel", out, re.MULTILINE)
assert not re.search(r"There is a SECONDARY channel named 'admin'", out, re.MULTILINE)
assert re.search(r'Warning: NodeId foo not found in DB', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit @pytest.mark.unit
@@ -1094,18 +1124,14 @@ def test_main_pos_fields_no_args(capsys, reset_globals):
pos_flags = MagicMock(autospec=meshtastic.radioconfig_pb2.PositionFlags) pos_flags = MagicMock(autospec=meshtastic.radioconfig_pb2.PositionFlags)
with patch('meshtastic.serial_interface.SerialInterface') as mo: with patch('meshtastic.serial_interface.SerialInterface') as mo:
mo().getNode().radioConfig.preferences.position_flags = 35
with patch('meshtastic.radioconfig_pb2.PositionFlags', return_value=pos_flags) as mrc: with patch('meshtastic.radioconfig_pb2.PositionFlags', return_value=pos_flags) as mrc:
# kind of cheating here, we are setting up the node
mocked_node = MagicMock(autospec=Node)
anode = mocked_node()
anode.radioConfig.preferences.position_flags = 35
Globals.getInstance().set_target_node(anode)
mrc.values.return_value = [0, 1, 2, 4, 8, 16, 32, 64, 128, 256] mrc.values.return_value = [0, 1, 2, 4, 8, 16, 32, 64, 128, 256]
# Note: When you use side_effect and a list, each call will use a value from the front of the list then # Note: When you use side_effect and a list, each call will use a value from the front of the list then
# remove that value from the list. If there are three values in the list, we expect it to be called # remove that value from the list. If there are three values in the list, we expect it to be called
# three times. # three times.
mrc.Name.side_effect = [ 'POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY' ] mrc.Name.side_effect = ['POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY']
main() main()
@@ -1186,13 +1212,9 @@ def test_main_get_with_valid_values(capsys, reset_globals):
with patch('meshtastic.serial_interface.SerialInterface') as mo: with patch('meshtastic.serial_interface.SerialInterface') as mo:
# kind of cheating here, we are setting up the node mo().getNode().radioConfig.preferences.wifi_ssid = 'foo'
mocked_node = MagicMock(autospec=Node) mo().getNode().radioConfig.preferences.ls_secs = 300
anode = mocked_node() mo().getNode().radioConfig.preferences.fixed_position = False
anode.radioConfig.preferences.wifi_ssid = 'foo'
anode.radioConfig.preferences.ls_secs = 300
anode.radioConfig.preferences.fixed_position = False
Globals.getInstance().set_target_node(anode)
main() main()
@@ -1421,7 +1443,7 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
@pytest.mark.unit @pytest.mark.unit
def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals): def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals):
"""Test --gpio_rd with no named gpio channel""" """Test --gpio_rd with no named gpio channel"""
sys.argv = ['', '--gpio-rd', '0x10'] sys.argv = ['', '--gpio-rd', '0x10', '--dest', '!foo']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface) iface = MagicMock(autospec=SerialInterface)
@@ -1442,9 +1464,9 @@ def test_main_gpio_rd_no_dest(capsys, reset_globals):
sys.argv = ['', '--gpio-rd', '0x2000'] sys.argv = ['', '--gpio-rd', '0x2000']
Globals.getInstance().set_args(sys.argv) Globals.getInstance().set_args(sys.argv)
channel = Channel(index=1, role=1) channel = Channel(index=2, role=2)
channel.settings.modem_config = 3 channel.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel.settings.psk = b'\x01' channel.settings.name = 'gpio'
iface = MagicMock(autospec=SerialInterface) iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel iface.localNode.getChannelByName.return_value = channel

View File

@@ -10,6 +10,8 @@ from ..mesh_interface import MeshInterface
from ..node import Node from ..node import Node
from .. import mesh_pb2 from .. import mesh_pb2
from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR
from ..radioconfig_pb2 import RadioConfig
from ..util import Timeout
@pytest.mark.unit @pytest.mark.unit
@@ -164,6 +166,22 @@ def test_sendPosition(reset_globals, caplog):
assert re.search(r'p.time:', caplog.text, re.MULTILINE) assert re.search(r'p.time:', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_close_with_heartbeatTimer(reset_globals, caplog):
"""Test close() with heartbeatTimer"""
iface = MeshInterface(noProto=True)
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
radioConfig.preferences.phone_timeout_secs = 10
anode.radioConfig = radioConfig
iface.localNode = anode
assert iface.heartbeatTimer is None
with caplog.at_level(logging.DEBUG):
iface._startHeartbeat()
assert iface.heartbeatTimer is not None
iface.close()
@pytest.mark.unit @pytest.mark.unit
def test_handleFromRadio_empty_payload(reset_globals, caplog): def test_handleFromRadio_empty_payload(reset_globals, caplog):
"""Test _handleFromRadio""" """Test _handleFromRadio"""
@@ -543,3 +561,70 @@ def test_getOrCreateByNum(capsys, reset_globals, iface_with_nodes):
iface.myInfo.my_node_num = 2475227164 iface.myInfo.my_node_num = 2475227164
tmp = iface._getOrCreateByNum(2475227164) tmp = iface._getOrCreateByNum(2475227164)
assert tmp['num'] == 2475227164 assert tmp['num'] == 2475227164
@pytest.mark.unit
def test_enter():
"""Test __enter__()"""
iface = MeshInterface(noProto=True)
assert iface == iface.__enter__()
@pytest.mark.unit
def test_exit_with_exception(caplog):
"""Test __exit__()"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.ERROR):
iface.__exit__('foo', 'bar', 'baz')
assert re.search(r'An exception of type foo with value bar has occurred', caplog.text, re.MULTILINE)
assert re.search(r'Traceback: baz', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes):
"""Test that we hit that continue statement"""
with caplog.at_level(logging.DEBUG):
iface = iface_with_nodes
iface.localNode.nodeNum = 2475227164
iface.showNodes()
iface.showNodes(includeSelf=False)
capsys.readouterr()
@pytest.mark.unitslow
def test_waitForConfig(caplog, capsys):
"""Test waitForConfig()"""
iface = MeshInterface(noProto=True)
# override how long to wait
iface._timeout = Timeout(0.01)
with pytest.raises(Exception) as pytest_wrapped_e:
iface.waitForConfig()
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'Exception: Timed out waiting for interface config', err, re.MULTILINE)
assert out == ''
@pytest.mark.unit
def test_waitConnected_raises_an_exception(caplog, capsys):
"""Test waitConnected()"""
iface = MeshInterface(noProto=True)
with pytest.raises(Exception) as pytest_wrapped_e:
iface.failure = "warn about something"
iface._waitConnected(0.01)
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'warn about something', err, re.MULTILINE)
assert out == ''
@pytest.mark.unit
def test_waitConnected_isConnected_timeout(caplog, capsys):
"""Test waitConnected()"""
with pytest.raises(Exception) as pytest_wrapped_e:
iface = MeshInterface()
iface._waitConnected(0.01)
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'warn about something', err, re.MULTILINE)
assert out == ''

View File

@@ -11,6 +11,7 @@ from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig from ..radioconfig_pb2 import RadioConfig
from ..util import Timeout
@pytest.mark.unit @pytest.mark.unit
@@ -90,6 +91,16 @@ def test_setOwner_no_short_name_and_long_name_has_words(caplog):
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE) assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setOwner_long_name_no_short(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='Aabo', is_licensed=True)
assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)
@pytest.mark.unit @pytest.mark.unit
def test_exitSimulator(caplog): def test_exitSimulator(caplog):
"""Test exitSimulator""" """Test exitSimulator"""
@@ -869,3 +880,14 @@ def test_onResponseRequestSetting_with_error(capsys):
out, err = capsys.readouterr() out, err = capsys.readouterr()
assert re.search(r'Error on response', out) assert re.search(r'Error on response', out)
assert err == '' assert err == ''
@pytest.mark.unitslow
def test_waitForConfig():
"""Test waitForConfig()"""
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
anode._timeout = Timeout(0.01)
result = anode.waitForConfig()
assert not result

View File

@@ -3,15 +3,19 @@
import re import re
from unittest.mock import patch from unittest.mock import patch, mock_open
import pytest import pytest
from ..serial_interface import SerialInterface from ..serial_interface import SerialInterface
@pytest.mark.unit @pytest.mark.unit
@patch("time.sleep")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch('serial.Serial') @patch('serial.Serial')
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyUSBfake']) @patch('meshtastic.util.findPorts', return_value=['/dev/ttyUSBfake'])
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, capsys): def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_sleep, capsys):
"""Test that we can instantiate a SerialInterface with a single port""" """Test that we can instantiate a SerialInterface with a single port"""
iface = SerialInterface(noProto=True) iface = SerialInterface(noProto=True)
iface.showInfo() iface.showInfo()

View File

@@ -0,0 +1,706 @@
"""Meshtastic smoke tests with a single virtual device via localhost.
During the CI build of the Meshtastic-device, a build.zip file is created.
Inside that build.zip is a standalone executable meshtasticd_linux_amd64.
That linux executable will simulate a Meshtastic device listening on localhost.
This smoke test runs against that localhost.
"""
import re
import subprocess
import time
import platform
import os
# Do not like using hard coded sleeps, but it probably makes
# sense to pause for the radio at apprpriate times
import pytest
from ..util import findPorts
# seconds to pause after running a meshtastic command
PAUSE_AFTER_COMMAND = 0.1
PAUSE_AFTER_REBOOT = 0.1
#TODO: need to fix the virtual device to have a reboot. When you issue the command
# below, you get "FIXME implement reboot for this platform"
#@pytest.mark.smokevirt
#def test_smokevirt_reboot():
# """Test reboot"""
# return_value, _ = subprocess.getstatusoutput('meshtastic --host localhost --reboot')
# assert return_value == 0
# # pause for the radio to reset
# time.sleep(8)
@pytest.mark.smokevirt
def test_smokevirt_info():
"""Test --info"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Owner', out, re.MULTILINE)
assert re.search(r'^My info', out, re.MULTILINE)
assert re.search(r'^Nodes in mesh', out, re.MULTILINE)
assert re.search(r'^Preferences', out, re.MULTILINE)
assert re.search(r'^Channels', out, re.MULTILINE)
assert re.search(r'^ PRIMARY', out, re.MULTILINE)
assert re.search(r'^Primary channel URL', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_sendping():
"""Test --sendping"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --sendping')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Sending ping message', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_get_with_invalid_setting():
"""Test '--get a_bad_setting'."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get a_bad_setting')
assert re.search(r'Choices in sorted order', out)
assert return_value == 0
@pytest.mark.smokevirt
def test_set_with_invalid_setting():
"""Test '--set a_bad_setting'."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set a_bad_setting foo')
assert re.search(r'Choices in sorted order', out)
assert return_value == 0
@pytest.mark.smokevirt
def test_ch_set_with_invalid_settingpatch_find_ports():
"""Test '--ch-set with a_bad_setting'."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set invalid_setting foo --ch-index 0')
assert re.search(r'Choices in sorted order', out)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_pos_fields():
"""Test --pos-fields (with some values POS_ALTITUDE POS_ALT_MSL POS_BATTERY)"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --pos-fields POS_ALTITUDE POS_ALT_MSL POS_BATTERY')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting position fields to 35', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --pos-fields')
assert re.match(r'Connected to radio', out)
assert re.search(r'POS_ALTITUDE', out, re.MULTILINE)
assert re.search(r'POS_ALT_MSL', out, re.MULTILINE)
assert re.search(r'POS_BATTERY', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_test_with_arg_but_no_hardware():
"""Test --test
Note: Since only one device is connected, it will not do much.
"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --test')
assert re.search(r'^Warning: Must have at least two devices', out, re.MULTILINE)
assert return_value == 1
@pytest.mark.smokevirt
def test_smokevirt_debug():
"""Test --debug"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info --debug')
assert re.search(r'^Owner', out, re.MULTILINE)
assert re.search(r'^DEBUG file', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_seriallog_to_file():
"""Test --seriallog to a file creates a file"""
filename = 'tmpoutput.txt'
if os.path.exists(f"{filename}"):
os.remove(f"{filename}")
return_value, _ = subprocess.getstatusoutput(f'meshtastic --host localhost --info --seriallog {filename}')
assert os.path.exists(f"{filename}")
assert return_value == 0
os.remove(f"{filename}")
@pytest.mark.smokevirt
def test_smokevirt_qr():
"""Test --qr"""
filename = 'tmpqr'
if os.path.exists(f"{filename}"):
os.remove(f"{filename}")
return_value, _ = subprocess.getstatusoutput(f'meshtastic --host localhost --qr > {filename}')
assert os.path.exists(f"{filename}")
# not really testing that a valid qr code is created, just that the file size
# is reasonably big enough for a qr code
assert os.stat(f"{filename}").st_size > 20000
assert return_value == 0
os.remove(f"{filename}")
@pytest.mark.smokevirt
def test_smokevirt_nodes():
"""Test --nodes"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --nodes')
assert re.match(r'Connected to radio', out)
if platform.system() != 'Windows':
assert re.search(r' User ', out, re.MULTILINE)
assert re.search(r' 1 ', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_send_hello():
"""Test --sendtext hello"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --sendtext hello')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Sending text message hello to \^all', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_port():
"""Test --port"""
# first, get the ports
ports = findPorts()
# hopefully there is none
assert len(ports) == 0
@pytest.mark.smokevirt
def test_smokevirt_set_is_router_true():
"""Test --set is_router true"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set is_router true')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set is_router to true', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get is_router')
assert re.search(r'^is_router: True', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_location_info():
"""Test --setlat, --setlon and --setalt """
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --setlat 32.7767 --setlon -96.7970 --setalt 1337')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Fixing altitude', out, re.MULTILINE)
assert re.search(r'^Fixing latitude', out, re.MULTILINE)
assert re.search(r'^Fixing longitude', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out2 = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'1337', out2, re.MULTILINE)
assert re.search(r'32.7767', out2, re.MULTILINE)
assert re.search(r'-96.797', out2, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_is_router_false():
"""Test --set is_router false"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set is_router false')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set is_router to false', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get is_router')
assert re.search(r'^is_router: False', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_owner():
"""Test --set-owner name"""
# make sure the owner is not Joe
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-owner Bob')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting device owner to Bob', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'Owner: Joe', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-owner Joe')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting device owner to Joe', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'Owner: Joe', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_team():
"""Test --set-team """
# unset the team
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-team CLEAR')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting team to CLEAR', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-team CYAN')
assert re.search(r'Setting team to CYAN', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'CYAN', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_values():
"""Test --ch-longslow, --ch-longfast, --ch-mediumslow, --ch-mediumsfast,
--ch-shortslow, and --ch-shortfast arguments
"""
exp = {
'--ch-longslow': 'Bw125Cr48Sf4096',
'--ch-longfast': 'Bw31_25Cr48Sf512',
'--ch-mediumslow': 'Bw250Cr46Sf2048',
'--ch-mediumfast': 'Bw250Cr47Sf1024',
'--ch-shortslow': '{ "psk',
'--ch-shortfast': 'Bw500Cr45Sf128'
}
for key, val in exp.items():
return_value, out = subprocess.getstatusoutput(f'meshtastic --host localhost {key}')
assert re.match(r'Connected to radio', out)
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio (might reboot)
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(val, out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ch_set_name():
"""Test --ch-set name"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'MyChannel', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name MyChannel')
assert re.match(r'Connected to radio', out)
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name MyChannel --ch-index 0')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set name to MyChannel', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'MyChannel', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_set_downlink_and_uplink():
"""Test -ch-set downlink_enabled X and --ch-set uplink_enabled X"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled false --ch-set uplink_enabled false')
assert re.match(r'Connected to radio', out)
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# pylint: disable=C0301
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled false --ch-set uplink_enabled false --ch-index 0')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'uplinkEnabled', out, re.MULTILINE)
assert not re.search(r'downlinkEnabled', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# pylint: disable=C0301
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled true --ch-set uplink_enabled true --ch-index 0')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set downlink_enabled to true', out, re.MULTILINE)
assert re.search(r'^Set uplink_enabled to true', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'uplinkEnabled', out, re.MULTILINE)
assert re.search(r'downlinkEnabled', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_add_and_ch_del():
"""Test --ch-add"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
# make sure the secondar channel is not there
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert not re.search(r'SECONDARY', out, re.MULTILINE)
assert not re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_enable_and_disable():
"""Test --ch-enable and --ch-disable"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# ensure they need to specify a --ch-index
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable')
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'DISABLED', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-enable --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ch_del_a_disabled_non_primary_channel():
"""Test --ch-del will work on a disabled non-primary channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# ensure they need to specify a --ch-index
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable')
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert not re.search(r'DISABLED', out, re.MULTILINE)
assert not re.search(r'SECONDARY', out, re.MULTILINE)
assert not re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_attempt_to_delete_primary_channel():
"""Test that we cannot delete the PRIMARY channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 0')
assert re.search(r'Warning: Cannot delete primary channel', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_attempt_to_disable_primary_channel():
"""Test that we cannot disable the PRIMARY channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable --ch-index 0')
assert re.search(r'Warning: Cannot enable', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_attempt_to_enable_primary_channel():
"""Test that we cannot enable the PRIMARY channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-enable --ch-index 0')
assert re.search(r'Warning: Cannot enable', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ensure_ch_del_second_of_three_channels():
"""Test that when we delete the 2nd of 3 channels, that it deletes the correct channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing2')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing2', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing2', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ensure_ch_del_third_of_three_channels():
"""Test that when we delete the 3rd of 3 channels, that it deletes the correct channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing2')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing2', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 2')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ch_set_modem_config():
"""Test --ch-set modem_config"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set modem_config Bw31_25Cr48Sf512')
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set modem_config Bw31_25Cr48Sf512 --ch-index 0')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set modem_config to Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_seturl_default():
"""Test --seturl with default value"""
# set some channel value so we no longer have a default channel
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name foo --ch-index 0')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# ensure we no longer have a default primary channel
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search('CgUYAyIBAQ', out, re.MULTILINE)
assert return_value == 0
url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
return_value, out = subprocess.getstatusoutput(f"meshtastic --host localhost --seturl {url}")
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search('CgUYAyIBAQ', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_seturl_invalid_url():
"""Test --seturl with invalid url"""
# Note: This url is no longer a valid url.
url = "https://www.meshtastic.org/c/#GAMiENTxuzogKQdZ8Lz_q89Oab8qB0RlZmF1bHQ="
return_value, out = subprocess.getstatusoutput(f"meshtastic --host localhost --seturl {url}")
assert re.match(r'Connected to radio', out)
assert re.search('Warning: There were no settings', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_configure():
"""Test --configure"""
_ , out = subprocess.getstatusoutput(f"meshtastic --host localhost --configure example_config.yaml")
assert re.match(r'Connected to radio', out)
assert re.search('^Setting device owner to Bob TBeam', out, re.MULTILINE)
assert re.search('^Fixing altitude at 304 meters', out, re.MULTILINE)
assert re.search('^Fixing latitude at 35.8', out, re.MULTILINE)
assert re.search('^Fixing longitude at -93.8', out, re.MULTILINE)
assert re.search('^Setting device position', out, re.MULTILINE)
assert re.search('^Set region to 1', out, re.MULTILINE)
assert re.search('^Set is_always_powered to true', out, re.MULTILINE)
assert re.search('^Set send_owner_interval to 2', out, re.MULTILINE)
assert re.search('^Set screen_on_secs to 31536000', out, re.MULTILINE)
assert re.search('^Set wait_bluetooth_secs to 31536000', out, re.MULTILINE)
assert re.search('^Writing modified preferences to device', out, re.MULTILINE)
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
@pytest.mark.smokevirt
def test_smokevirt_set_ham():
"""Test --set-ham
Note: Do a factory reset after this setting so it is very short-lived.
"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-ham KI1234')
assert re.search(r'Setting Ham ID', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'Owner: KI1234', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_wifi_settings():
"""Test --set wifi_ssid and --set wifi_password"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set wifi_ssid "some_ssid" --set wifi_password "temp1234"')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set wifi_ssid to some_ssid', out, re.MULTILINE)
assert re.search(r'^Set wifi_password to temp1234', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get wifi_ssid --get wifi_password')
assert re.search(r'^wifi_ssid: some_ssid', out, re.MULTILINE)
assert re.search(r'^wifi_password: sekrit', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_factory_reset():
"""Test factory reset"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set factory_reset true')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set factory_reset to true', out, re.MULTILINE)
assert re.search(r'^Writing modified preferences to device', out, re.MULTILINE)
assert return_value == 0
# NOTE: The virtual radio will not respond well after this command. Need to re-start the virtual program at this point.
# TODO: fix?

View File

@@ -99,7 +99,7 @@ def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, reset_g
assert re.search(r'in onTunnelReceive', caplog.text, re.MULTILINE) assert re.search(r'in onTunnelReceive', caplog.text, re.MULTILINE)
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()""" """Test _shouldFilterPacket()"""
@@ -117,7 +117,7 @@ def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals,
assert not ignore assert not ignore
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()""" """Test _shouldFilterPacket()"""
@@ -135,7 +135,7 @@ def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_glo
assert ignore assert ignore
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()""" """Test _shouldFilterPacket()"""
@@ -173,7 +173,7 @@ def test_shouldFilterPacket_udp(mock_platform_system, caplog, reset_globals, ifa
assert not ignore assert not ignore
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()""" """Test _shouldFilterPacket()"""
@@ -213,7 +213,7 @@ def test_shouldFilterPacket_tcp(mock_platform_system, caplog, reset_globals, ifa
assert not ignore assert not ignore
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()""" """Test _shouldFilterPacket()"""
@@ -234,7 +234,7 @@ def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_
assert ignore assert ignore
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _ipToNodeId()""" """Test _ipToNodeId()"""
@@ -250,7 +250,7 @@ def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with
assert nodeid is None assert nodeid is None
@pytest.mark.unit @pytest.mark.unitslow
@patch('platform.system') @patch('platform.system')
def test_ipToNodeId_all(mock_platform_system, caplog, reset_globals, iface_with_nodes): def test_ipToNodeId_all(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _ipToNodeId()""" """Test _ipToNodeId()"""

View File

@@ -88,7 +88,7 @@ def test_pskToString_one_byte_zero_value():
assert pskToString(bytes([0x00])) == 'unencrypted' assert pskToString(bytes([0x00])) == 'unencrypted'
@pytest.mark.unit @pytest.mark.unitslow
def test_pskToString_one_byte_non_zero_value(): def test_pskToString_one_byte_non_zero_value():
"""Test pskToString one byte that is non-zero""" """Test pskToString one byte that is non-zero"""
assert pskToString(bytes([0x01])) == 'default' assert pskToString(bytes([0x01])) == 'default'
@@ -118,7 +118,7 @@ def test_our_exit_zero_return_value(capsys):
assert pytest_wrapped_e.value.code == 0 assert pytest_wrapped_e.value.code == 0
@pytest.mark.unit @pytest.mark.unitslow
def test_our_exit_non_zero_return_value(capsys): def test_our_exit_non_zero_return_value(capsys):
"""Test our_exit with a non-zero return value""" """Test our_exit with a non-zero return value"""
with pytest.raises(SystemExit) as pytest_wrapped_e: with pytest.raises(SystemExit) as pytest_wrapped_e:
@@ -130,7 +130,7 @@ def test_our_exit_non_zero_return_value(capsys):
assert pytest_wrapped_e.value.code == 1 assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit @pytest.mark.unitslow
def test_fixme(): def test_fixme():
"""Test fixme()""" """Test fixme()"""
with pytest.raises(Exception) as pytest_wrapped_e: with pytest.raises(Exception) as pytest_wrapped_e:
@@ -166,7 +166,7 @@ def test_remove_keys_from_dict_empty_keys_empty_dict():
assert not remove_keys_from_dict((), {}) assert not remove_keys_from_dict((), {})
@pytest.mark.unit @pytest.mark.unitslow
def test_remove_keys_from_dict_empty_dict(): def test_remove_keys_from_dict_empty_dict():
"""Test when dict is empty""" """Test when dict is empty"""
assert not remove_keys_from_dict(('a'), {}) assert not remove_keys_from_dict(('a'), {})
@@ -178,12 +178,29 @@ def test_remove_keys_from_dict_empty_keys():
assert remove_keys_from_dict((), {'a':1}) == {'a':1} assert remove_keys_from_dict((), {'a':1}) == {'a':1}
@pytest.mark.unit @pytest.mark.unitslow
def test_remove_keys_from_dict(): def test_remove_keys_from_dict():
"""Test remove_keys_from_dict()""" """Test remove_keys_from_dict()"""
assert remove_keys_from_dict(('b'), {'a':1, 'b':2}) == {'a':1} assert remove_keys_from_dict(('b'), {'a':1, 'b':2}) == {'a':1}
@pytest.mark.unitslow
def test_remove_keys_from_dict_multiple_keys():
"""Test remove_keys_from_dict()"""
keys = ('a', 'b')
adict = {'a': 1, 'b': 2, 'c': 3}
assert remove_keys_from_dict(keys, adict) == {'c':3}
@pytest.mark.unit
def test_remove_keys_from_dict_nested():
"""Test remove_keys_from_dict()"""
keys = ('b')
adict = {'a': {'b': 1}, 'b': 2, 'c': 3}
exp = {'a': {}, 'c': 3}
assert remove_keys_from_dict(keys, adict) == exp
@pytest.mark.unitslow @pytest.mark.unitslow
def test_Timeout_not_found(): def test_Timeout_not_found():
"""Test Timeout()""" """Test Timeout()"""
@@ -207,7 +224,7 @@ def test_hexstr():
assert hexstr(b'') == '' assert hexstr(b'') == ''
@pytest.mark.unit @pytest.mark.unitslow
def test_ipstr(): def test_ipstr():
"""Test ipstr()""" """Test ipstr()"""
assert ipstr(b'1234') == '49.50.51.52' assert ipstr(b'1234') == '49.50.51.52'
@@ -220,14 +237,14 @@ def test_readnet_u16():
assert readnet_u16(b'123456', 2) == 13108 assert readnet_u16(b'123456', 2) == 13108
@pytest.mark.unit @pytest.mark.unitslow
@patch('serial.tools.list_ports.comports', return_value=[]) @patch('serial.tools.list_ports.comports', return_value=[])
def test_findPorts_when_none_found(patch_comports): def test_findPorts_when_none_found(patch_comports):
"""Test findPorts()""" """Test findPorts()"""
assert not findPorts() assert not findPorts()
@pytest.mark.unit @pytest.mark.unitslow
def test_convert_mac_addr(): def test_convert_mac_addr():
"""Test convert_mac_addr()""" """Test convert_mac_addr()"""
assert convert_mac_addr('/c0gFyhb') == 'fd:cd:20:17:28:5b' assert convert_mac_addr('/c0gFyhb') == 'fd:cd:20:17:28:5b'

View File

@@ -22,9 +22,9 @@ from pubsub import pub
from pytap2 import TapDevice from pytap2 import TapDevice
from . import portnums_pb2 from meshtastic import portnums_pb2
from .util import ipstr, readnet_u16 from meshtastic.util import ipstr, readnet_u16
from .globals import Globals from meshtastic.globals import Globals
def onTunnelReceive(packet, interface): def onTunnelReceive(packet, interface):

View File

@@ -195,7 +195,8 @@ def support_info():
print(' Machine: {0}'.format(platform.uname().machine)) print(' Machine: {0}'.format(platform.uname().machine))
print(' Encoding (stdin): {0}'.format(sys.stdin.encoding)) print(' Encoding (stdin): {0}'.format(sys.stdin.encoding))
print(' Encoding (stdout): {0}'.format(sys.stdout.encoding)) print(' Encoding (stdout): {0}'.format(sys.stdout.encoding))
print(' meshtastic: v{0}'.format(pkg_resources.require('meshtastic')[0].version)) the_version = pkg_resources.get_distribution("meshtastic").version
print(' meshtastic: v{0}'.format(the_version))
print(' Executable: {0}'.format(sys.argv[0])) print(' Executable: {0}'.format(sys.argv[0]))
print(' Python: {0} {1} {2}'.format(platform.python_version(), print(' Python: {0} {1} {2}'.format(platform.python_version(),
platform.python_implementation(), platform.python_compiler())) platform.python_implementation(), platform.python_compiler()))
@@ -204,12 +205,18 @@ def support_info():
def remove_keys_from_dict(keys, adict): def remove_keys_from_dict(keys, adict):
"""Return a dictionary without some keys in it.""" """Return a dictionary without some keys in it.
newdict = adict Will removed nested keys.
"""
for key in keys: for key in keys:
if key in adict: try:
del newdict[key] del adict[key]
return newdict except:
pass
for val in adict.values():
if isinstance(val, dict):
remove_keys_from_dict(keys, val)
return adict
def hexstr(barray): def hexstr(barray):

2
proto

Submodule proto updated: 1d3b4806ab...18fc4cdb52

View File

@@ -1,11 +1,12 @@
[pytest] [pytest]
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples" addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples and not smokevirt"
markers = markers =
unit: marks tests as unit tests unit: marks tests as unit tests
unitslow: marks slow unit tests unitslow: marks slow unit tests
int: marks tests as integration tests int: marks tests as integration tests
smokevirt: marks tests as smoke tests against virtual device
smoke1: runs smoke tests on a single device connected via USB smoke1: runs smoke tests on a single device connected via USB
smoke2: runs smoke tests on a two devices connected via USB smoke2: runs smoke tests on a two devices connected via USB
smokewifi: runs smoke test on an esp32 device setup with wifi smokewifi: runs smoke test on an esp32 device setup with wifi

View File

@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work # This call to setup() does all the work
setup( setup(
name="meshtastic", name="meshtastic",
version="1.2.49", version="1.2.53",
description="Python API & client shell for talking to Meshtastic devices", description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",
@@ -23,7 +23,11 @@ setup(
classifiers=[ classifiers=[
"License :: OSI Approved :: MIT License", "License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3", "Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7", "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
], ],
packages=["meshtastic"], packages=["meshtastic"],
include_package_data=True, include_package_data=True,