Compare commits

...

81 Commits

Author SHA1 Message Date
mkinney
b08e96b66a Update setup.py
bump version for release
2022-01-11 11:02:26 -08:00
mkinney
9e74ead54e Merge pull request #219 from mkinney/smoke_virt
add smoke test for virtual device
2022-01-11 10:56:33 -08:00
Mike Kinney
2cf52f3df6 add a convenience target in Makefile for running the smokevirt test 2022-01-11 18:50:18 +00:00
Mike Kinney
50e9a0a9f7 exclude smokevirt from ci 2022-01-11 18:44:39 +00:00
Mike Kinney
841c44e05c add smoke test for virtual device 2022-01-11 18:39:49 +00:00
mkinney
465bafeb30 Merge pull request #218 from mkinney/fix_remote_admin_message
refactor code to only call local node when necessary; fix tests
2022-01-10 16:51:04 -08:00
Mike Kinney
cb8dafbd31 add more coverage 2022-01-10 16:40:47 -08:00
Mike Kinney
52db617b06 refactor code to only call local node when necessary; fix tests 2022-01-10 16:20:11 -08:00
Jm Casler
ec622590da updating proto submodule to latest 2022-01-09 22:23:07 -08:00
mkinney
345cb1bdc4 Merge pull request #217 from mkinney/fix_remote_admin_message
Fix remote admin message
2022-01-06 12:36:03 -08:00
Mike Kinney
8ca692a26e cannot call os.getlogin() on github instances 2022-01-06 12:33:30 -08:00
Mike Kinney
d1ea68d7dc add code coverage to recently added code 2022-01-06 12:17:42 -08:00
Mike Kinney
b56440a4e8 should not need to talk with remote node if just doing sendtext 2022-01-06 11:55:36 -08:00
github-actions
1401b949a3 Update protobuf submodule 2022-01-06 17:09:45 +00:00
mkinney
97f3ce6198 Merge pull request #215 from mkinney/combine_build_single_executables
combine the building of single executables into one action
2022-01-06 00:25:46 -08:00
Mike Kinney
8019391914 combine the building of single executables into one action 2022-01-06 00:24:59 -08:00
mkinney
8d10010ab1 Merge pull request #214 from mkinney/more_testing_stuff
move slower unit tests to unitslow
2022-01-06 00:00:42 -08:00
mkinney
a2b4d2a96a Update build_mac.yml 2022-01-05 23:47:02 -08:00
mkinney
4de558bdf6 Update build_mac.yml 2022-01-05 23:40:49 -08:00
mkinney
8ff06a0de1 Update build_mac.yml 2022-01-05 23:37:35 -08:00
mkinney
f8ad6061c1 Update build_mac.yml
revert
2022-01-05 23:06:49 -08:00
mkinney
aa4cdb6aea Update build_mac.yml 2022-01-05 23:01:13 -08:00
Mike Kinney
cba424fde0 move slower unit tests to unitslow 2022-01-05 21:12:50 -08:00
mkinney
6c7a870645 Merge pull request #213 from mkinney/add_python_310
check python v3.10 as well
2022-01-05 18:20:55 -08:00
Mike Kinney
f42b1ad4e0 check python v3.10 as well 2022-01-05 18:18:31 -08:00
mkinney
30a51952e0 Merge pull request #212 from mkinney/serial_perms
improve the permission error on linux
2022-01-05 15:08:24 -08:00
Mike Kinney
5de754c5ab improve the permission error on linux 2022-01-05 23:03:06 +00:00
mkinney
bda446c7b5 Update build_mac.yml 2022-01-05 14:13:42 -08:00
mkinney
f79540f197 Update build_mac.yml 2022-01-05 14:09:17 -08:00
mkinney
d507697e56 Merge pull request #210 from mkinney/work_on_single_executable
install meshtastic before building single executable
2022-01-05 13:58:47 -08:00
Mike Kinney
acbc1f2e30 install meshtastic before building single executable 2022-01-05 13:56:07 -08:00
mkinney
7b3c68119c Merge pull request #209 from mkinney/add_modules_for_building_single_executables
collect the meshtastic libs
2022-01-05 13:36:25 -08:00
Mike Kinney
39f97166b0 collect the meshtastic libs 2022-01-05 13:34:28 -08:00
mkinney
541d19cafb Merge pull request #208 from mkinney/create_actions_for_building_single_executables
Create actions for building single executables
2022-01-05 13:26:47 -08:00
Mike Kinney
969a81b779 Merge remote-tracking branch 'upstream/master' into create_actions_for_building_single_executables 2022-01-05 13:24:33 -08:00
Mike Kinney
3f76c1efb0 refactor version info so pyinstaller will work; add build mac and ubuntu standalone executables 2022-01-05 13:22:37 -08:00
mkinney
774849189f Merge pull request #207 from mkinney/create_build_windows_action
add a build windows action
2022-01-05 12:46:10 -08:00
Mike Kinney
53d626aa72 add a build windows action 2022-01-05 12:44:05 -08:00
mkinney
1a3a840269 Merge pull request #206 from mkinney/fully_qualify_imports
need to fully qualify imports so projects consuming the library will …
2022-01-05 11:24:13 -08:00
Mike Kinney
fe69f05e75 add python 3.6, 3.7, 3.8, and 3.9 for ci and validation 2022-01-05 11:20:04 -08:00
Mike Kinney
5c662822b9 need to fully qualify imports so projects consuming the library will work 2022-01-05 11:16:08 -08:00
mkinney
c049d3424a Merge pull request #205 from mkinney/remove_nested_keys
remove nested keys from nodes so we do not display garbage
2022-01-02 11:28:07 -08:00
Mike Kinney
471535853b bump version 2022-01-02 11:20:15 -08:00
Mike Kinney
676148cc14 meant to use decoded not decode 2022-01-02 11:19:17 -08:00
Mike Kinney
a915b05240 remove nested keys from nodes so we do not display garbage 2022-01-02 11:15:19 -08:00
Jm Casler
a1668e8c66 updating proto submodule to latest 2022-01-01 23:25:22 -08:00
mkinney
e7664cb40b Merge pull request #204 from mkinney/add_more_unit_tests
get last two lines covered in node
2022-01-01 15:51:21 -08:00
Mike Kinney
83c18f4008 working on more unit tests 2022-01-01 15:48:33 -08:00
Mike Kinney
8b6321ce7f add a few more tests 2022-01-01 15:21:53 -08:00
Mike Kinney
9fac981ba6 test heartbeatTimer 2022-01-01 14:53:57 -08:00
Mike Kinney
ccc71930f7 get last two lines covered in node 2022-01-01 13:25:36 -08:00
mkinney
9380f048fa Update setup.py
bump version
2022-01-01 11:11:54 -08:00
mkinney
0a655ac8df Merge pull request #203 from mkinney/minor_changes
do not print line for export; comment out ble test; do not send decoded
2022-01-01 09:51:34 -08:00
Mike Kinney
0b6676c5b3 do not print line for export; comment out ble test; do not send decoded 2022-01-01 09:49:21 -08:00
mkinney
e5ecba7ec0 Merge pull request #202 from mkinney/format_mac_address
if mac address is in nodes, format it like a valid mac address
2021-12-31 20:04:19 -08:00
Mike Kinney
a1809f5b84 if mac address is in nodes, format it like a valid mac address 2021-12-31 20:01:14 -08:00
mkinney
9c66447913 Merge pull request #201 from mkinney/more_testing
added tests for _getOrCreateByNum(), nodeNumToId(), and _fixupPositio…
2021-12-31 14:26:37 -08:00
Mike Kinney
65960fb982 added tests for _getOrCreateByNum(), nodeNumToId(), and _fixupPosition(); found/fixed bug on _fixupPosition 2021-12-31 13:43:37 -08:00
mkinney
9d0bc09e0f Merge pull request #200 from mkinney/tuning_tests
Tuning tests
2021-12-31 12:30:00 -08:00
Mike Kinney
475ddcc8dd add tests for _ipToNodeId() 2021-12-31 12:28:14 -08:00
Mike Kinney
105276f98e add unit tests for _shouldFilterPacket() 2021-12-31 12:17:04 -08:00
Mike Kinney
4ee647403b fix output on tests using pytest -s option; fixed some tests 2021-12-31 10:55:13 -08:00
Mike Kinney
10f48f130f move some unit tests to unitslow 2021-12-31 09:59:22 -08:00
mkinney
bd697864e4 Merge pull request #199 from mkinney/unit_testing_continues
revert the stream interface change; fix tunnel tests
2021-12-31 09:40:58 -08:00
Mike Kinney
ab876c9efd add unit test for findPorts() 2021-12-31 09:38:44 -08:00
Mike Kinney
aba303c677 figured out issue; had device connected to serial port; needed to patch; fixed tunnel test in main 2021-12-31 09:28:17 -08:00
Mike Kinney
43d59ca8d8 temp comment out tests that pass locally but not when run from CI 2021-12-31 08:53:17 -08:00
Mike Kinney
177705aeff revert the stream interface change; fix tunnel tests 2021-12-31 08:49:13 -08:00
Sacha Weatherstone
b92fff0da6 Create vercel.json 2021-12-31 20:46:21 +11:00
mkinney
6a6b72a2ae Merge pull request #198 from mkinney/keep_working_on_unit_tests
start to add unit tests for tunnel
2021-12-30 23:01:05 -08:00
Mike Kinney
614a90c0eb unit test a few more lines 2021-12-30 22:59:01 -08:00
Mike Kinney
9adbed4be6 add unit tests for onTunnelReceive() 2021-12-30 22:52:49 -08:00
Mike Kinney
809f005f61 add unit tests for ipstr(), hexstr(), and readnet_u16() 2021-12-30 22:26:26 -08:00
Mike Kinney
d366e74e86 refactor of Tunnel() for unit testing; create unit tests for Tunnel() 2021-12-30 21:24:32 -08:00
Mike Kinney
3f307880f9 add unit tests for tunnel and subnet 2021-12-30 20:04:32 -08:00
Mike Kinney
50523ec1b1 start to add unit tests for tunnel 2021-12-30 19:37:38 -08:00
mkinney
684b2885aa Merge pull request #197 from mkinney/work_on_unit_tests
add more tests; do not need the old --reply test
2021-12-30 12:28:36 -08:00
Mike Kinney
f5eb8738fb added unit tests for --ch-set and onNode() 2021-12-30 12:20:24 -08:00
Mike Kinney
14941c742a add more tests; do not need the old --reply test 2021-12-30 11:28:03 -08:00
mkinney
217add3b00 Update README.md
add code coverage badge
2021-12-30 09:32:18 -08:00
mkinney
4bac85b6a9 Update ci.yml 2021-12-30 09:21:17 -08:00
34 changed files with 2077 additions and 415 deletions

View File

@@ -1,2 +1,6 @@
[run]
omit = meshtastic/*_pb2.py,meshtastic/tests/*.py,meshtastic/test.py
[report]
exclude_lines =
if __name__ == .__main__.:

90
.github/workflows/build_executables.yml vendored Normal file
View File

@@ -0,0 +1,90 @@
name: Build and publish standalone executables
on: workflow_dispatch
jobs:
build-and-publish-mac:
runs-on: macos-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Setup code signing
env:
MACOS_CERTIFICATE: ${{ secrets.MACOS_CERTIFICATE }}
MACOS_CERTIFICATE_PWD: ${{ secrets.MACOS_CERTIFICATE_PWD }}
MACOS_KEYCHAIN_PASSWORD: ${{ secrets.MACOS_KEYCHAIN_PASSWORD }}
run: |
echo $MACOS_CERTIFICATE | base64 --decode > certificate.p12
security create-keychain -p "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
security default-keychain -s meshtastic.keychain
security unlock-keychain -p "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
security import certificate.p12 -k meshtastic.keychain -P "$MACOS_CERTIFICATE_PWD" -T /usr/bin/codesign
security set-key-partition-list -S apple-tool:,apple:,codesign: -s -k "$MACOS_KEYCHAIN_PASSWORD" meshtastic.keychain
- name: Build
env:
MACOS_SIGNING_IDENTITY: ${{ secrets.MACOS_SIGNING_IDENTITY }}
run: |
pip install pyinstaller
pip install -r requirements.txt
pip install .
pyinstaller -F -n meshtastic --collect-all meshtastic --codesign-identity "$MACOS_SIGNING_IDENTITY" meshtastic/__main__.py
- uses: actions/upload-artifact@v2
with:
name: meshtastic_mac
path: dist
build-and-publish-ubuntu:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Build
run: |
pip install pyinstaller
pip install -r requirements.txt
pip install .
pyinstaller -F -n meshtastic --collect-all meshtastic meshtastic/__main__.py
- uses: actions/upload-artifact@v2
with:
name: meshtastic_ubuntu
path: dist
build-and-publish-windows:
runs-on: windows-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.9
- name: Build
run: |
pip install pyinstaller
pip install -r requirements.txt
pip install .
pyinstaller -F -n meshtastic --collect-all meshtastic meshtastic/__main__.py
- uses: actions/upload-artifact@v2
with:
name: meshtastic_windows
path: dist

View File

@@ -10,12 +10,18 @@ on:
jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
steps:
- uses: actions/checkout@v2
- name: Install Python 3
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Uninstall meshtastic
run: |
pip3 uninstall meshtastic
@@ -32,14 +38,32 @@ jobs:
run: pylint meshtastic
- name: Run tests with pytest
run: pytest --cov=meshtastic
- name: Generate coverage report
run: |
pytest --cov=meshtastic --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
flags: unittests
name: codecov-umbrella
yml: ./codecov.yml
fail_ci_if_error: true
validate:
runs-on: ubuntu-latest
strategy:
matrix:
python-version:
- "3.6"
- "3.7"
- "3.8"
- "3.9"
- "3.10"
steps:
- uses: actions/checkout@v2
- name: Install Python 3
uses: actions/setup-python@v1
with:
python-version: 3.9
- name: Install meshtastic from local
run: |
pip3 install .

1
.gitignore vendored
View File

@@ -14,3 +14,4 @@ venv/
.DS_Store
__pycache__
examples/__pycache__
meshtastic.spec

View File

@@ -2,6 +2,10 @@
test:
pytest -m unit
# only run the smoke tests against the virtual device
virt:
pytest -m smokevirt
# local install
install:
pip install .
@@ -16,7 +20,7 @@ lint:
# show the slowest unit tests
slow:
pytest --durations=0
pytest -m unit --durations=5
# run the coverage report and open results in a browser
cov:

View File

@@ -2,6 +2,7 @@
[![Open in Visual Studio Code](https://open.vscode.dev/badges/open-in-vscode.svg)](https://open.vscode.dev/meshtastic/Meshtastic-python)
![Unit Tests](https://github.com/meshtastic/Meshtastic-python/actions/workflows/ci.yml/badge.svg)
[![codecov](https://codecov.io/gh/meshtastic/Meshtastic-python/branch/master/graph/badge.svg?token=TIWPJL73KV)](https://codecov.io/gh/meshtastic/Meshtastic-python)
A python client for using [Meshtastic](https://www.meshtastic.org) devices. This small library (and example application) provides an easy API for sending and receiving messages over mesh radios. It also provides access to any of the operations/data available in the device user interface or the Android application. Events are delivered using a publish-subscribe model, and you can subscribe to only the message types you are interested in.

View File

@@ -77,9 +77,11 @@ from pubsub import pub
from dotmap import DotMap
from tabulate import tabulate
from google.protobuf.json_format import MessageToJson
from .util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout
from .node import Node
from . import mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2, environmental_measurement_pb2, remote_hardware_pb2, channel_pb2, radioconfig_pb2, util
from meshtastic.util import fixme, catchAndIgnore, stripnl, DeferredExecution, Timeout
from meshtastic.node import Node
from meshtastic import (mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2,
environmental_measurement_pb2, remote_hardware_pb2,
channel_pb2, radioconfig_pb2, util)
# Note: To follow PEP224, comments should be after the module variable.

View File

@@ -5,6 +5,7 @@
import argparse
import platform
import logging
import os
import sys
import time
import yaml
@@ -13,14 +14,13 @@ import pyqrcode
import pkg_resources
import meshtastic.util
import meshtastic.test
from . import remote_hardware
from . import portnums_pb2, channel_pb2, radioconfig_pb2
from .globals import Globals
from meshtastic import remote_hardware
from meshtastic.ble_interface import BLEInterface
from meshtastic import portnums_pb2, channel_pb2, radioconfig_pb2
from meshtastic.globals import Globals
from meshtastic.__init__ import BROADCAST_ADDR
have_tunnel = platform.system() == 'Linux'
"""We only import the tunnel code if we are on a platform that can run it. """
def onReceive(packet, interface):
"""Callback invoked when a packet arrives"""
our_globals = Globals.getInstance()
@@ -45,7 +45,7 @@ def onReceive(packet, interface):
interface.sendText(reply)
except Exception as ex:
print(ex)
print(f'Warning: There is no field {ex} in the packet.')
def onConnection(interface, topic=pub.AUTO_TOPIC):
@@ -137,15 +137,9 @@ def onConnected(interface):
our_globals = Globals.getInstance()
args = our_globals.get_args()
print("Connected to radio")
def getNode():
"""This operation could be expensive, so we try to cache the results"""
targetNode = our_globals.get_target_node()
if not targetNode:
targetNode = interface.getNode(args.destOrLocal)
our_globals.set_target_node(targetNode)
return targetNode
# do not print this line if we are exporting the config
if not args.export_config:
print("Connected to radio")
if args.setlat or args.setlon or args.setalt:
closeNow = True
@@ -178,12 +172,12 @@ def onConnected(interface):
if args.set_owner:
closeNow = True
print(f"Setting device owner to {args.set_owner}")
getNode().setOwner(args.set_owner)
interface.getNode(args.dest).setOwner(args.set_owner)
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
prefs = getNode().radioConfig.preferences
prefs = interface.getNode(args.dest).radioConfig.preferences
allFields = 0
try:
@@ -200,12 +194,12 @@ def onConnected(interface):
print(f"Setting position fields to {allFields}")
setPref(prefs, 'position_flags', ('%d' % allFields))
print("Writing modified preferences to device")
getNode().writeConfig()
interface.getNode(args.dest).writeConfig()
elif args.pos_fields is not None:
# If --pos-fields invoked without args, read and display current value
closeNow = True
prefs = getNode().radioConfig.preferences
prefs = interface.getNode(args.dest).radioConfig.preferences
fieldNames = []
for bit in radioconfig_pb2.PositionFlags.values():
@@ -224,81 +218,85 @@ def onConnected(interface):
print(sorted(meshtastic.mesh_pb2.Team.keys()))
else:
print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}")
getNode().setOwner(team=v_team)
interface.getNode(args.dest).setOwner(team=v_team)
if args.set_ham:
closeNow = True
print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
getNode().setOwner(args.set_ham, is_licensed=True)
interface.getNode(args.dest).setOwner(args.set_ham, is_licensed=True)
# Must turn off encryption on primary channel
getNode().turnOffEncryptionOnPrimaryChannel()
interface.getNode(args.dest).turnOffEncryptionOnPrimaryChannel()
if args.reboot:
closeNow = True
getNode().reboot()
interface.getNode(args.dest).reboot()
if args.sendtext:
closeNow = True
channelIndex = 0
if args.ch_index is not None:
channelIndex = int(args.ch_index)
ch = getNode().getChannelByChannelIndex(channelIndex)
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
logging.debug(f'ch:{ch}')
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}")
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
print(f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}")
interface.sendText(args.sendtext, args.dest, wantAck=True, channelIndex=channelIndex)
else:
meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.")
if args.sendping:
payload = str.encode("test string")
print(f"Sending ping message to {args.destOrAll}")
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
print(f"Sending ping message to {args.dest}")
interface.sendData(payload, args.dest, portNum=portnums_pb2.PortNum.REPLY_APP,
wantAck=True, wantResponse=True)
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.dest == BROADCAST_ADDR:
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
else:
rhc = remote_hardware.RemoteHardwareClient(interface)
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in (args.gpio_wrb or []):
bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0])
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
if args.gpio_wrb:
bitmask = 0
bitval = 0
for wrpair in (args.gpio_wrb or []):
bitmask |= 1 << int(wrpair[0])
bitval |= int(wrpair[1]) << int(wrpair[0])
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
rhc.writeGPIOs(args.dest, bitmask, bitval)
closeNow = True
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto:
# wait up to X seconds for a response
for _ in range(10):
if args.gpio_rd:
bitmask = int(args.gpio_rd, 16)
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
interface.mask = bitmask
rhc.readGPIOs(args.dest, bitmask, None)
if not interface.noProto:
# wait up to X seconds for a response
for _ in range(10):
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f'end of gpio_rd')
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
if interface.gotResponse:
break
logging.debug(f'end of gpio_rd')
if args.gpio_watch:
bitmask = int(args.gpio_watch, 16)
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
while True:
rhc.watchGPIOs(args.dest, bitmask)
time.sleep(1)
# handle settings
if args.set:
closeNow = True
prefs = getNode().radioConfig.preferences
prefs = interface.getNode(args.dest).radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.set:
setPref(prefs, pref[0], pref[1])
print("Writing modified preferences to device")
getNode().writeConfig()
interface.getNode(args.dest).writeConfig()
if args.configure:
with open(args.configure[0], encoding='utf8') as file:
@@ -307,11 +305,11 @@ def onConnected(interface):
if 'owner' in configuration:
print(f"Setting device owner to {configuration['owner']}")
getNode().setOwner(configuration['owner'])
interface.getNode(args.dest).setOwner(configuration['owner'])
if 'channel_url' in configuration:
print("Setting channel url to", configuration['channel_url'])
getNode().setURL(configuration['channel_url'])
interface.getNode(args.dest).setURL(configuration['channel_url'])
if 'location' in configuration:
alt = 0
@@ -336,11 +334,11 @@ def onConnected(interface):
interface.localNode.writeConfig()
if 'user_prefs' in configuration:
prefs = getNode().radioConfig.preferences
prefs = interface.getNode(args.dest).radioConfig.preferences
for pref in configuration['user_prefs']:
setPref(prefs, pref, str(configuration['user_prefs'][pref]))
print("Writing modified preferences to device")
getNode().writeConfig()
interface.getNode(args.dest).writeConfig()
if args.export_config:
# export the configuration (the opposite of '--configure')
@@ -349,7 +347,7 @@ def onConnected(interface):
if args.seturl:
closeNow = True
getNode().setURL(args.seturl)
interface.getNode(args.dest).setURL(args.seturl)
# handle changing channels
@@ -357,7 +355,7 @@ def onConnected(interface):
closeNow = True
if len(args.ch_add) > 10:
meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.")
n = getNode()
n = interface.getNode(args.dest)
ch = n.getChannelByName(args.ch_add)
if ch:
meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.")
@@ -385,7 +383,7 @@ def onConnected(interface):
meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1)
else:
print(f"Deleting channel {channelIndex}")
ch = getNode().deleteChannel(channelIndex)
ch = interface.getNode(args.dest).deleteChannel(channelIndex)
ch_changes = [args.ch_longslow, args.ch_longfast,
args.ch_mediumslow, args.ch_mediumfast,
@@ -401,7 +399,7 @@ def onConnected(interface):
channelIndex = 0
else:
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
ch = getNode().channels[channelIndex]
ch = interface.getNode(args.dest).channels[channelIndex]
if any_primary_channel_changes or args.ch_enable or args.ch_disable:
@@ -462,21 +460,22 @@ def onConnected(interface):
ch.role = channel_pb2.Channel.Role.DISABLED
print(f"Writing modified channels to device")
getNode().writeChannel(channelIndex)
interface.getNode(args.dest).writeChannel(channelIndex)
if args.info:
print("")
if not args.dest: # If we aren't trying to talk to our local node, don't show it
# If we aren't trying to talk to our local node, don't show it
if args.dest == BROADCAST_ADDR:
interface.showInfo()
print("")
getNode().showInfo()
interface.getNode(args.dest).showInfo()
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
print("")
if args.get:
closeNow = True
prefs = getNode().radioConfig.preferences
prefs = interface.getNode(args.dest).radioConfig.preferences
# Handle the int/float/bool arguments
for pref in args.get:
@@ -495,12 +494,16 @@ def onConnected(interface):
qr = pyqrcode.create(url)
print(qr.terminal())
have_tunnel = platform.system() == 'Linux'
if have_tunnel and args.tunnel:
# pylint: disable=C0415
from . import tunnel
# Even if others said we could close, stay open if the user asked for a tunnel
closeNow = False
tunnel.Tunnel(interface, subnet=args.tunnel_net)
if interface.noProto:
logging.warning(f"Not starting Tunnel - disabled by noProto")
else:
tunnel.Tunnel(interface, subnet=args.tunnel_net)
# if the user didn't ask for serial debugging output, we might want to exit after we've done our operation
if (not args.seriallog) and closeNow:
@@ -586,13 +589,8 @@ def common():
channelIndex = int(args.ch_index)
our_globals.set_channel_index(channelIndex)
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
if not args.dest:
args.destOrAll = "^all"
args.destOrLocal = "^local"
else:
args.destOrAll = args.dest
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
args.dest = BROADCAST_ADDR
if not args.seriallog:
if args.noproto:
@@ -627,19 +625,25 @@ def common():
subscribe()
if args.ble:
client = meshtastic.ble_interface.BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
client = BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
elif args.host:
client = meshtastic.tcp_interface.TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto)
client = meshtastic.tcp_interface.TCPInterface(args.host, debugOut=logfile, noProto=args.noproto)
else:
client = meshtastic.serial_interface.SerialInterface(
args.port, debugOut=logfile, noProto=args.noproto)
try:
client = meshtastic.serial_interface.SerialInterface(args.port, debugOut=logfile, noProto=args.noproto)
except PermissionError as ex:
username = os.getlogin()
message = "Permission Error:\n"
message += " Need to add yourself to the 'dialout' group by running:\n"
message += f" sudo usermod -a -G dialout {username}\n"
message += " After running that command, log out and re-login for it to take effect.\n"
message += f"Error was:{ex}"
meshtastic.util.our_exit(message)
# We assume client is fully connected now
onConnected(client)
#if logfile:
#logfile.close()
have_tunnel = platform.system() == 'Linux'
if args.noproto or args.reply or (have_tunnel and args.tunnel): # loop until someone presses ctrlc
while True:
time.sleep(1000)
@@ -805,16 +809,18 @@ def initParser():
parser.add_argument('--unset-router', dest='deprecated',
action='store_false', help='Deprecated, use "--set is_router false" instead')
have_tunnel = platform.system() == 'Linux'
if have_tunnel:
parser.add_argument('--tunnel',
action='store_true', help="Create a TUN tunnel device for forwarding IP packets over the mesh")
parser.add_argument(
"--subnet", dest='tunnel_net', help="Sets the local-end subnet address for the TUN IP bridge", default=None)
parser.add_argument('--tunnel', action='store_true',
help="Create a TUN tunnel device for forwarding IP packets over the mesh")
parser.add_argument("--subnet", dest='tunnel_net',
help="Sets the local-end subnet address for the TUN IP bridge. (ex: 10.115' which is the default)",
default=None)
parser.set_defaults(deprecated=None)
parser.add_argument('--version', action='version',
version=f"{pkg_resources.require('meshtastic')[0].version}")
the_version = pkg_resources.get_distribution("meshtastic").version
parser.add_argument('--version', action='version', version=f"{the_version}")
parser.add_argument(
"--support", action='store_true', help="Show support info (useful when troubleshooting an issue)")

View File

@@ -4,7 +4,7 @@ import logging
import pygatt
from .mesh_interface import MeshInterface
from meshtastic.mesh_interface import MeshInterface
# Our standard BLE characteristics
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"

View File

@@ -27,16 +27,17 @@ class Globals:
Globals.__instance = self
self.args = None
self.parser = None
self.target_node = None
self.channel_index = None
self.logfile = None
self.tunnelInstance = None
def reset(self):
"""Reset all of our globals. If you add a member, add it to this method, too."""
self.args = None
self.parser = None
self.target_node = None
self.channel_index = None
self.logfile = None
self.tunnelInstance = None
# setters
def set_args(self, args):
@@ -47,10 +48,6 @@ class Globals:
"""Set the parser"""
self.parser = parser
def set_target_node(self, target_node):
"""Set the target_node"""
self.target_node = target_node
def set_channel_index(self, channel_index):
"""Set the channel_index"""
self.channel_index = channel_index
@@ -59,6 +56,10 @@ class Globals:
"""Set the logfile"""
self.logfile = logfile
def set_tunnelInstance(self, tunnelInstance):
"""Set the tunnelInstance"""
self.tunnelInstance = tunnelInstance
# getters
def get_args(self):
"""Get args"""
@@ -68,10 +69,6 @@ class Globals:
"""Get parser"""
return self.parser
def get_target_node(self):
"""Get target_node"""
return self.target_node
def get_channel_index(self):
"""Get channel_index"""
return self.channel_index
@@ -79,3 +76,7 @@ class Globals:
def get_logfile(self):
"""Get logfile"""
return self.logfile
def get_tunnelInstance(self):
"""Get tunnelInstance"""
return self.tunnelInstance

View File

@@ -17,9 +17,9 @@ from google.protobuf.json_format import MessageToJson
import meshtastic.node
from . import portnums_pb2, mesh_pb2
from .util import stripnl, Timeout, our_exit, remove_keys_from_dict
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
from meshtastic import portnums_pb2, mesh_pb2
from meshtastic.util import stripnl, Timeout, our_exit, remove_keys_from_dict, convert_mac_addr
from meshtastic.__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
class MeshInterface:
"""Interface class for meshtastic devices
@@ -68,8 +68,7 @@ class MeshInterface:
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is not None and exc_value is not None:
logging.error(
f'An exception of type {exc_type} with value {exc_value} has occurred')
logging.error(f'An exception of type {exc_type} with value {exc_value} has occurred')
if traceback is not None:
logging.error(f'Traceback: {traceback}')
self.close()
@@ -84,9 +83,18 @@ class MeshInterface:
nodes = ""
if self.nodes:
for n in self.nodes.values():
# when the TBeam is first booted, it sometimes shows the 'raw' data
# when the TBeam is first booted, it sometimes shows the raw data
# so, we will just remove any raw keys
n2 = remove_keys_from_dict('raw', n)
keys_to_remove = ('raw', 'decoded', 'payload')
n2 = remove_keys_from_dict(keys_to_remove, n)
# if we have 'macaddr', re-format it
if 'macaddr' in n2['user']:
val = n2['user']['macaddr']
# decode the base64 value
addr = convert_mac_addr(val)
n2['user']['macaddr'] = addr
nodes = nodes + f" {stripnl(n2)}"
infos = owner + myinfo + mesh + nodes
print(infos)
@@ -151,7 +159,7 @@ class MeshInterface:
def getNode(self, nodeId):
"""Return a node object which contains device settings and channel info"""
if nodeId == LOCAL_ADDR:
if nodeId in (LOCAL_ADDR, BROADCAST_ADDR):
return self.localNode
else:
n = meshtastic.node.Node(self, nodeId)
@@ -385,11 +393,11 @@ class MeshInterface:
return user.get('shortName', None)
return None
def _waitConnected(self):
def _waitConnected(self, timeout=15.0):
"""Block until the initial node db download is complete, or timeout
and raise an exception"""
if not self.noProto:
if not self.isConnected.wait(15.0): # timeout after x seconds
if not self.isConnected.wait(timeout): # timeout after x seconds
raise Exception("Timed out waiting for connection completion")
# If we failed while connecting, raise the connection to the client
@@ -407,8 +415,7 @@ class MeshInterface:
def _disconnected(self):
"""Called by subclasses to tell clients this interface has disconnected"""
self.isConnected.clear()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.lost", interface=self))
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.lost", interface=self))
def _startHeartbeat(self):
"""We need to send a heartbeat message to the device every X seconds"""
@@ -434,8 +441,7 @@ class MeshInterface:
if not self.isConnected.is_set():
self.isConnected.set()
self._startHeartbeat()
publishingThread.queueWork(lambda: pub.sendMessage(
"meshtastic.connection.established", interface=self))
publishingThread.queueWork(lambda: pub.sendMessage("meshtastic.connection.established", interface=self))
def _startConfig(self):
"""Start device packets flowing"""
@@ -507,7 +513,8 @@ class MeshInterface:
elif fromRadio.HasField("node_info"):
node = asDict["nodeInfo"]
try:
self._fixupPosition(node["position"])
newpos = self._fixupPosition(node["position"])
node["position"] = newpos
except:
logging.debug("Node without position")
@@ -539,12 +546,14 @@ class MeshInterface:
"""Convert integer lat/lon into floats
Arguments:
position {Position dictionary} -- object ot fix up
position {Position dictionary} -- object to fix up
Returns the position with the updated keys
"""
if "latitudeI" in position:
position["latitude"] = position["latitudeI"] * 1e-7
if "longitudeI" in position:
position["longitude"] = position["longitudeI"] * 1e-7
return position
def _nodeNumToId(self, num):
"""Map a node node number to a node ID

View File

@@ -4,8 +4,8 @@
import logging
import base64
from google.protobuf.json_format import MessageToJson
from . import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from .util import pskToString, stripnl, Timeout, our_exit, fromPSK
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
@@ -257,6 +257,7 @@ class Node:
p = admin_pb2.AdminMessage()
p.get_radio_request = True
# TODO: should we check that localNode has an 'admin' channel?
# Show progress message for super slow operations
if self != self.iface.localNode:
print("Requesting preferences from remote node.")

View File

@@ -2,8 +2,8 @@
"""
import logging
from pubsub import pub
from . import portnums_pb2, remote_hardware_pb2
from .util import our_exit
from meshtastic import portnums_pb2, remote_hardware_pb2
from meshtastic.util import our_exit
def onGPIOreceive(packet, interface):

View File

@@ -6,7 +6,7 @@ import platform
import serial
import meshtastic.util
from .stream_interface import StreamInterface
from meshtastic.stream_interface import StreamInterface
if platform.system() != 'Windows':
import termios
@@ -40,28 +40,25 @@ class SerialInterface(StreamInterface):
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
# see https://github.com/pyserial/pyserial/issues/124
if not self.noProto:
if platform.system() != 'Windows':
with open(devPath, encoding='utf8') as f:
attrs = termios.tcgetattr(f)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
f.close()
time.sleep(0.1)
if platform.system() != 'Windows':
with open(devPath, encoding='utf8') as f:
attrs = termios.tcgetattr(f)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
f.close()
time.sleep(0.1)
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
if not self.noProto:
self.stream.flush()
time.sleep(0.1)
self.stream.flush()
time.sleep(0.1)
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
def close(self):
"""Close a connection to the device"""
if not self.noProto:
self.stream.flush()
time.sleep(0.1)
self.stream.flush()
time.sleep(0.1)
self.stream.flush()
time.sleep(0.1)
self.stream.flush()
time.sleep(0.1)
logging.debug("Closing Serial stream")
StreamInterface.close(self)

View File

@@ -7,8 +7,8 @@ import traceback
import serial
from .mesh_interface import MeshInterface
from .util import stripnl
from meshtastic.mesh_interface import MeshInterface
from meshtastic.util import stripnl
START1 = 0x94

View File

@@ -4,7 +4,7 @@ import logging
import socket
from typing import AnyStr
from .stream_interface import StreamInterface
from meshtastic.stream_interface import StreamInterface
class TCPInterface(StreamInterface):
"""Interface class for meshtastic devices over a TCP link"""
@@ -17,8 +17,6 @@ class TCPInterface(StreamInterface):
hostname {string} -- Hostname/IP address of the device to connect to
"""
# Instead of wrapping as a stream, we use the native socket API
# self.stream = sock.makefile('rw')
self.stream = None
self.hostname = hostname

View File

@@ -8,9 +8,9 @@ import traceback
from dotmap import DotMap
from pubsub import pub
import meshtastic.util
from .__init__ import BROADCAST_NUM
from .serial_interface import SerialInterface
from .tcp_interface import TCPInterface
from meshtastic.__init__ import BROADCAST_NUM
from meshtastic.serial_interface import SerialInterface
from meshtastic.tcp_interface import TCPInterface
"""The interfaces we are using for our tests"""

View File

@@ -6,13 +6,21 @@ import pytest
@pytest.mark.int
def test_int_no_args():
"""Test without any args"""
def test_int_meshtastic_no_args():
"""Test meshtastic without any args"""
return_value, out = subprocess.getstatusoutput('meshtastic')
assert re.match(r'usage: meshtastic', out)
assert return_value == 1
@pytest.mark.int
def test_int_mesh_tunnel_no_args():
"""Test mesh-tunnel without any args"""
return_value, out = subprocess.getstatusoutput('mesh-tunnel')
assert re.match(r'usage: mesh-tunnel', out)
assert return_value == 1
@pytest.mark.int
def test_int_version():
"""Test '--version'."""

View File

@@ -5,16 +5,17 @@ import sys
import os
import re
import logging
import platform
from unittest.mock import patch, MagicMock
import pytest
from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config, getPref, setPref
from meshtastic.__main__ import initParser, main, Globals, onReceive, onConnection, export_config, getPref, setPref, onNode, tunnelMain
#from ..radioconfig_pb2 import UserPreferences
import meshtastic.radioconfig_pb2
from ..serial_interface import SerialInterface
from ..tcp_interface import TCPInterface
from ..ble_interface import BLEInterface
#from ..ble_interface import BLEInterface
from ..node import Node
from ..channel_pb2 import Channel
from ..remote_hardware import onGPIOreceive
@@ -62,7 +63,7 @@ def test_main_main_version(capsys, reset_globals):
@pytest.mark.unit
def test_main_main_no_args(reset_globals):
def test_main_main_no_args(reset_globals, capsys):
"""Test with no args"""
sys.argv = ['']
Globals.getInstance().set_args(sys.argv)
@@ -71,6 +72,8 @@ def test_main_main_no_args(reset_globals):
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
_, err = capsys.readouterr()
assert re.search(r'usage:', err, re.MULTILINE)
@pytest.mark.unit
@@ -111,38 +114,41 @@ def test_main_ch_index_no_devices(patched_find_ports, capsys, reset_globals):
@pytest.mark.unit
@patch('meshtastic.util.findPorts', return_value=[])
def test_main_test_no_ports(patched_find_ports, reset_globals):
def test_main_test_no_ports(patched_find_ports, reset_globals, capsys):
"""Test --test with no hardware"""
sys.argv = ['', '--test']
Globals.getInstance().set_args(sys.argv)
assert Globals.getInstance().get_target_node() is None
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
patched_find_ports.assert_called()
out, err = capsys.readouterr()
assert re.search(r'Warning: Must have at least two devices connected to USB', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyFake1'])
def test_main_test_one_port(patched_find_ports, reset_globals):
def test_main_test_one_port(patched_find_ports, reset_globals, capsys):
"""Test --test with one fake port"""
sys.argv = ['', '--test']
Globals.getInstance().set_args(sys.argv)
assert Globals.getInstance().get_target_node() is None
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
patched_find_ports.assert_called()
out, err = capsys.readouterr()
assert re.search(r'Warning: Must have at least two devices connected to USB', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@patch('meshtastic.test.testAll', return_value=True)
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyFake1', '/dev/ttyFake2'])
def test_main_test_two_ports_success(patched_find_ports, patched_test_all, reset_globals):
def test_main_test_two_ports_success(patched_test_all, reset_globals, capsys):
"""Test --test two fake ports and testAll() is a simulated success"""
sys.argv = ['', '--test']
Globals.getInstance().set_args(sys.argv)
@@ -151,14 +157,15 @@ def test_main_test_two_ports_success(patched_find_ports, patched_test_all, reset
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 0
# TODO: why does this fail? patched_find_ports.assert_called()
patched_test_all.assert_called()
out, err = capsys.readouterr()
assert re.search(r'Test was a success.', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@patch('meshtastic.test.testAll', return_value=False)
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyFake1', '/dev/ttyFake2'])
def test_main_test_two_ports_fails(patched_find_ports, patched_test_all, reset_globals):
def test_main_test_two_ports_fails(patched_test_all, reset_globals, capsys):
"""Test --test two fake ports and testAll() is a simulated failure"""
sys.argv = ['', '--test']
Globals.getInstance().set_args(sys.argv)
@@ -167,12 +174,14 @@ def test_main_test_two_ports_fails(patched_find_ports, patched_test_all, reset_g
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
# TODO: why does this fail? patched_find_ports.assert_called()
patched_test_all.assert_called()
out, err = capsys.readouterr()
assert re.search(r'Test was not successful.', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_main_info(capsys, reset_globals):
def test_main_info(capsys, caplog, reset_globals):
"""Test --info"""
sys.argv = ['', '--info']
Globals.getInstance().set_args(sys.argv)
@@ -181,13 +190,37 @@ def test_main_info(capsys, reset_globals):
def mock_showInfo():
print('inside mocked showInfo')
iface.showInfo.side_effect = mock_showInfo
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@patch('os.getlogin')
def test_main_info_with_permission_error(patched_getlogin, capsys, caplog, reset_globals):
"""Test --info"""
sys.argv = ['', '--info']
Globals.getInstance().set_args(sys.argv)
patched_getlogin.return_value = 'me'
iface = MagicMock(autospec=SerialInterface)
with caplog.at_level(logging.DEBUG):
with pytest.raises(SystemExit) as pytest_wrapped_e:
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
mo.side_effect = PermissionError('bla bla')
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
patched_getlogin.assert_called()
assert re.search(r'Need to add yourself', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@@ -209,23 +242,24 @@ def test_main_info_with_tcp_interface(capsys, reset_globals):
mo.assert_called()
@pytest.mark.unit
def test_main_info_with_ble_interface(capsys, reset_globals):
"""Test --info"""
sys.argv = ['', '--info', '--ble', 'foo']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=BLEInterface)
def mock_showInfo():
print('inside mocked showInfo')
iface.showInfo.side_effect = mock_showInfo
with patch('meshtastic.ble_interface.BLEInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
# TODO: comment out ble (for now)
#@pytest.mark.unit
#def test_main_info_with_ble_interface(capsys, reset_globals):
# """Test --info"""
# sys.argv = ['', '--info', '--ble', 'foo']
# Globals.getInstance().set_args(sys.argv)
#
# iface = MagicMock(autospec=BLEInterface)
# def mock_showInfo():
# print('inside mocked showInfo')
# iface.showInfo.side_effect = mock_showInfo
# with patch('meshtastic.ble_interface.BLEInterface', return_value=iface) as mo:
# main()
# out, err = capsys.readouterr()
# assert re.search(r'Connected to radio', out, re.MULTILINE)
# assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
# assert err == ''
# mo.assert_called()
@pytest.mark.unit
@@ -444,63 +478,71 @@ def test_main_sendtext_with_channel(capsys, reset_globals):
@pytest.mark.unit
def test_main_sendtext_with_invalid_channel(capsys, reset_globals):
def test_main_sendtext_with_invalid_channel(caplog, capsys, reset_globals):
"""Test --sendtext"""
sys.argv = ['', '--sendtext', 'hello', '--ch-index', '-1']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
iface.getChannelByChannelIndex.return_value = None
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
iface.getNode.return_value.getChannelByChannelIndex.return_value = None
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'is not a valid channel', out, re.MULTILINE)
assert err == ''
mo.assert_called()
iface.localNode.getChannelByChannelIndex.return_value = None
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'is not a valid channel', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
def test_main_sendtext_with_invalid_channel_nine(capsys, reset_globals):
def test_main_sendtext_with_invalid_channel_nine(caplog, capsys, reset_globals):
"""Test --sendtext"""
sys.argv = ['', '--sendtext', 'hello', '--ch-index', '9']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
iface.getNode.return_value.getChannelByChannelIndex.return_value = None
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'is not a valid channel', out, re.MULTILINE)
assert err == ''
mo.assert_called()
iface.localNode.getChannelByChannelIndex.return_value = None
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'is not a valid channel', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
def test_main_sendtext_with_dest(capsys, reset_globals):
def test_main_sendtext_with_dest(capsys, caplog, reset_globals, iface_with_nodes):
"""Test --sendtext with --dest"""
sys.argv = ['', '--sendtext', 'hello', '--dest', 'foo']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
def mock_sendText(text, dest, wantAck, channelIndex):
print('inside mocked sendText')
iface.sendText.side_effect = mock_sendText
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
mocked_channel = MagicMock(autospec=Channel)
iface.localNode.getChannelByChannelIndex = mocked_channel
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Sending text message', out, re.MULTILINE)
assert re.search(r'inside mocked sendText', out, re.MULTILINE)
assert err == ''
mo.assert_called()
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
with caplog.at_level(logging.DEBUG):
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert not re.search(r"Warning: 0 is not a valid channel", out, re.MULTILINE)
assert not re.search(r"There is a SECONDARY channel named 'admin'", out, re.MULTILINE)
assert re.search(r'Warning: NodeId foo not found in DB', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@@ -1082,18 +1124,14 @@ def test_main_pos_fields_no_args(capsys, reset_globals):
pos_flags = MagicMock(autospec=meshtastic.radioconfig_pb2.PositionFlags)
with patch('meshtastic.serial_interface.SerialInterface') as mo:
mo().getNode().radioConfig.preferences.position_flags = 35
with patch('meshtastic.radioconfig_pb2.PositionFlags', return_value=pos_flags) as mrc:
# kind of cheating here, we are setting up the node
mocked_node = MagicMock(autospec=Node)
anode = mocked_node()
anode.radioConfig.preferences.position_flags = 35
Globals.getInstance().set_target_node(anode)
mrc.values.return_value = [0, 1, 2, 4, 8, 16, 32, 64, 128, 256]
# Note: When you use side_effect and a list, each call will use a value from the front of the list then
# remove that value from the list. If there are three values in the list, we expect it to be called
# three times.
mrc.Name.side_effect = [ 'POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY' ]
mrc.Name.side_effect = ['POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY']
main()
@@ -1174,13 +1212,9 @@ def test_main_get_with_valid_values(capsys, reset_globals):
with patch('meshtastic.serial_interface.SerialInterface') as mo:
# kind of cheating here, we are setting up the node
mocked_node = MagicMock(autospec=Node)
anode = mocked_node()
anode.radioConfig.preferences.wifi_ssid = 'foo'
anode.radioConfig.preferences.ls_secs = 300
anode.radioConfig.preferences.fixed_position = False
Globals.getInstance().set_target_node(anode)
mo().getNode().radioConfig.preferences.wifi_ssid = 'foo'
mo().getNode().radioConfig.preferences.ls_secs = 300
mo().getNode().radioConfig.preferences.fixed_position = False
main()
@@ -1232,18 +1266,23 @@ def test_main_setchan(capsys, reset_globals):
main()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
_, err = capsys.readouterr()
assert re.search(r'usage:', err, re.MULTILINE)
@pytest.mark.unit
def test_main_onReceive_empty(caplog, reset_globals):
def test_main_onReceive_empty(caplog, reset_globals, capsys):
"""Test onReceive"""
sys.argv = ['']
Globals.getInstance().set_args(sys.argv)
args = MagicMock()
Globals.getInstance().set_args(args)
iface = MagicMock(autospec=SerialInterface)
packet = {'decoded': 'foo'}
packet = {}
with caplog.at_level(logging.DEBUG):
onReceive(packet, iface)
assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r"Warning: There is no field 'to' in the packet.", out, re.MULTILINE)
assert err == ''
# TODO: use this captured position app message (might want/need in the future)
@@ -1258,7 +1297,7 @@ def test_main_onReceive_empty(caplog, reset_globals):
# }
@pytest.mark.unit
def test_main_onReceive_with_sendtext(caplog, reset_globals):
def test_main_onReceive_with_sendtext(caplog, capsys, reset_globals):
"""Test onReceive with sendtext
The entire point of this test is to make sure the interface.close() call
is made in onReceive().
@@ -1287,63 +1326,48 @@ def test_main_onReceive_with_sendtext(caplog, reset_globals):
onReceive(packet, iface)
assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
mo.assert_called()
out, err = capsys.readouterr()
assert re.search(r'Sending text message hello to', out, re.MULTILINE)
assert err == ''
# TODO: re-write this with updated
# temp disable this test as we need to have a separate thread going with '--reply'
#@pytest.mark.unit
#def test_main_onReceive_with_reply(caplog, capsys, reset_globals):
# """Test onReceive with a reply
# To capture: on one device run '--sendtext aaa --reply' and on another
# device run '--sendtext bbb --reply', then back to the first device and
# run '--sendtext aaa2 --reply'. You should now see a "Sending reply" message.
# """
# sys.argv = ['', '--sendtext', 'hello', '--reply']
# Globals.getInstance().set_args(sys.argv)
#
# # Note: 'TEXT_MESSAGE_APP' value is 1
#
# send_packet = {
# 'to': 4294967295,
# 'decoded': {
# 'portnum': 1,
# 'payload': "hello"
# },
# 'id': 334776977,
# 'hop_limit': 3,
# 'want_ack': True
# }
#
# reply_packet = {
# 'from': 682968668,
# 'to': 4294967295,
# 'decoded': {
# 'portnum': 'TEXT_MESSAGE_APP',
# 'payload': b'bbb',
# 'text': 'bbb'
# },
# 'id': 1709936182,
# 'rxTime': 1640381999,
# 'rxSnr': 6.0,
# 'hopLimit': 3,
# 'raw': 'faked',
# 'fromId': '!28b5465c',
# 'toId': '^all'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# iface.myInfo.my_node_num = 4294967295
#
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with caplog.at_level(logging.DEBUG):
# main()
# onReceive(send_packet, iface)
# onReceive(reply_packet, iface)
# assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'got msg ', out, re.MULTILINE)
# assert err == ''
# mo.assert_called()
@pytest.mark.unit
def test_main_onReceive_with_text(caplog, capsys, reset_globals):
"""Test onReceive with text
"""
args = MagicMock()
args.sendtext.return_value = 'foo'
Globals.getInstance().set_args(args)
# Note: 'TEXT_MESSAGE_APP' value is 1
# Note: Some of this is faked below.
packet = {
'to': 4294967295,
'decoded': {
'portnum': 1,
'payload': "hello",
'text': "faked"
},
'id': 334776977,
'hop_limit': 3,
'want_ack': True,
'rxSnr': 6.0,
'hopLimit': 3,
'raw': 'faked',
'fromId': '!28b5465c',
'toId': '^all'
}
iface = MagicMock(autospec=SerialInterface)
iface.myInfo.my_node_num = 4294967295
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
with caplog.at_level(logging.DEBUG):
onReceive(packet, iface)
assert re.search(r'in onReceive', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Sending reply', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@@ -1381,6 +1405,10 @@ fixed_position: true
position_flags: 35"""
export_config(mo)
out, err = capsys.readouterr()
# ensure we do not output this line
assert not re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'owner: foo', out, re.MULTILINE)
assert re.search(r'channel_url: bar', out, re.MULTILINE)
assert re.search(r'location:', out, re.MULTILINE)
@@ -1406,7 +1434,7 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert not re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'# start of Meshtastic configure yaml', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@@ -1415,7 +1443,7 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
@pytest.mark.unit
def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals):
"""Test --gpio_rd with no named gpio channel"""
sys.argv = ['', '--gpio-rd', '0x10']
sys.argv = ['', '--gpio-rd', '0x10', '--dest', '!foo']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
@@ -1436,9 +1464,9 @@ def test_main_gpio_rd_no_dest(capsys, reset_globals):
sys.argv = ['', '--gpio-rd', '0x2000']
Globals.getInstance().set_args(sys.argv)
channel = Channel(index=1, role=1)
channel.settings.modem_config = 3
channel.settings.psk = b'\x01'
channel = Channel(index=2, role=2)
channel.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
channel.settings.name = 'gpio'
iface = MagicMock(autospec=SerialInterface)
iface.localNode.getChannelByName.return_value = channel
@@ -1524,6 +1552,36 @@ def test_main_getPref_valid_field(capsys, reset_globals):
assert err == ''
@pytest.mark.unit
def test_main_getPref_valid_field_string(capsys, reset_globals):
"""Test getPref() with a valid field and value as a string"""
prefs = MagicMock()
prefs.DESCRIPTOR.fields_by_name.get.return_value = 'wifi_ssid'
prefs.wifi_ssid = 'foo'
prefs.ls_secs = 300
prefs.fixed_position = False
getPref(prefs, 'wifi_ssid')
out, err = capsys.readouterr()
assert re.search(r'wifi_ssid: foo', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_main_getPref_valid_field_bool(capsys, reset_globals):
"""Test getPref() with a valid field and value as a bool"""
prefs = MagicMock()
prefs.DESCRIPTOR.fields_by_name.get.return_value = 'fixed_position'
prefs.wifi_ssid = 'foo'
prefs.ls_secs = 300
prefs.fixed_position = False
getPref(prefs, 'fixed_position')
out, err = capsys.readouterr()
assert re.search(r'fixed_position: False', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_main_getPref_invalid_field(capsys, reset_globals):
"""Test getPref() with an invalid field"""
@@ -1556,7 +1614,7 @@ def test_main_getPref_invalid_field(capsys, reset_globals):
@pytest.mark.unit
def test_main_setPref_valid_field(capsys, reset_globals):
def test_main_setPref_valid_field_int(capsys, reset_globals):
"""Test setPref() with a valid field"""
class Field:
@@ -1606,3 +1664,152 @@ def test_main_setPref_invalid_field(capsys, reset_globals):
# ensure they are sorted
assert re.search(r'fixed_position\s+is_router\s+ls_secs', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_main_ch_set_psk_no_ch_index(capsys, reset_globals):
"""Test --ch-set psk """
sys.argv = ['', '--ch-set', 'psk', 'foo', '--host', 'meshtastic.local']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=TCPInterface)
with patch('meshtastic.tcp_interface.TCPInterface', return_value=iface) as mo:
with pytest.raises(SystemExit) as pytest_wrapped_e:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r"Warning: Need to specify '--ch-index'", out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
mo.assert_called()
@pytest.mark.unit
def test_main_ch_set_psk_with_ch_index(capsys, reset_globals):
"""Test --ch-set psk """
sys.argv = ['', '--ch-set', 'psk', 'foo', '--host', 'meshtastic.local', '--ch-index', '0']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=TCPInterface)
with patch('meshtastic.tcp_interface.TCPInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r"Writing modified channels to device", out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
def test_main_ch_set_name_with_ch_index(capsys, reset_globals):
"""Test --ch-set setting other than psk"""
sys.argv = ['', '--ch-set', 'name', 'foo', '--host', 'meshtastic.local', '--ch-index', '0']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=TCPInterface)
with patch('meshtastic.tcp_interface.TCPInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Set name to foo', out, re.MULTILINE)
assert re.search(r"Writing modified channels to device", out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
def test_onNode(capsys, reset_globals):
"""Test onNode"""
onNode('foo')
out, err = capsys.readouterr()
assert re.search(r'Node changed', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_tunnel_no_args(capsys, reset_globals):
"""Test tunnel no arguments"""
sys.argv = ['']
Globals.getInstance().set_args(sys.argv)
with pytest.raises(SystemExit) as pytest_wrapped_e:
tunnelMain()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
_, err = capsys.readouterr()
assert re.search(r'usage: ', err, re.MULTILINE)
@pytest.mark.unit
@patch('meshtastic.util.findPorts', return_value=[])
@patch('platform.system')
def test_tunnel_tunnel_arg_with_no_devices(mock_platform_system, patched_find_ports, caplog, capsys, reset_globals):
"""Test tunnel with tunnel arg (act like we are on a linux system)"""
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
sys.argv = ['', '--tunnel']
Globals.getInstance().set_args(sys.argv)
print(f'platform.system():{platform.system()}')
with caplog.at_level(logging.DEBUG):
with pytest.raises(SystemExit) as pytest_wrapped_e:
tunnelMain()
mock_platform_system.assert_called()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: No Meshtastic devices detected', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@patch('meshtastic.util.findPorts', return_value=[])
@patch('platform.system')
def test_tunnel_subnet_arg_with_no_devices(mock_platform_system, patched_find_ports, caplog, capsys, reset_globals):
"""Test tunnel with subnet arg (act like we are on a linux system)"""
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
sys.argv = ['', '--subnet', 'foo']
Globals.getInstance().set_args(sys.argv)
print(f'platform.system():{platform.system()}')
with caplog.at_level(logging.DEBUG):
with pytest.raises(SystemExit) as pytest_wrapped_e:
tunnelMain()
mock_platform_system.assert_called()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: No Meshtastic devices detected', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@patch('platform.system')
def test_tunnel_tunnel_arg(mock_platform_system, caplog, reset_globals, iface_with_nodes, capsys):
"""Test tunnel with tunnel arg (act like we are on a linux system)"""
# Override the time.sleep so there is no loop
def my_sleep(amount):
sys.exit(3)
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
sys.argv = ['', '--tunnel']
Globals.getInstance().set_args(sys.argv)
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface):
with patch('time.sleep', side_effect=my_sleep):
with pytest.raises(SystemExit) as pytest_wrapped_e:
tunnelMain()
mock_platform_system.assert_called()
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 3
assert re.search(r'Not starting Tunnel', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert err == ''

View File

@@ -10,6 +10,8 @@ from ..mesh_interface import MeshInterface
from ..node import Node
from .. import mesh_pb2
from ..__init__ import LOCAL_ADDR, BROADCAST_ADDR
from ..radioconfig_pb2 import RadioConfig
from ..util import Timeout
@pytest.mark.unit
@@ -57,10 +59,8 @@ def test_MeshInterface(capsys, reset_globals):
def test_getMyUser(reset_globals, iface_with_nodes):
"""Test getMyUser()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
myuser = iface.getMyUser()
print(f'myuser:{myuser}')
assert myuser is not None
assert myuser["id"] == '!9388f81c'
@@ -166,6 +166,22 @@ def test_sendPosition(reset_globals, caplog):
assert re.search(r'p.time:', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_close_with_heartbeatTimer(reset_globals, caplog):
"""Test close() with heartbeatTimer"""
iface = MeshInterface(noProto=True)
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
radioConfig.preferences.phone_timeout_secs = 10
anode.radioConfig = radioConfig
iface.localNode = anode
assert iface.heartbeatTimer is None
with caplog.at_level(logging.DEBUG):
iface._startHeartbeat()
assert iface.heartbeatTimer is not None
iface.close()
@pytest.mark.unit
def test_handleFromRadio_empty_payload(reset_globals, caplog):
"""Test _handleFromRadio"""
@@ -460,3 +476,155 @@ def test_generatePacketId(capsys, reset_globals):
assert re.search(r'Not connected yet, can not generate packet', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == Exception
@pytest.mark.unit
def test_fixupPosition_empty_pos(capsys, reset_globals):
"""Test _fixupPosition()"""
iface = MeshInterface(noProto=True)
pos = {}
newpos = iface._fixupPosition(pos)
assert newpos == pos
@pytest.mark.unit
def test_fixupPosition_no_changes_needed(capsys, reset_globals):
"""Test _fixupPosition()"""
iface = MeshInterface(noProto=True)
pos = {"latitude": 101, "longitude": 102}
newpos = iface._fixupPosition(pos)
assert newpos == pos
@pytest.mark.unit
def test_fixupPosition(capsys, reset_globals):
"""Test _fixupPosition()"""
iface = MeshInterface(noProto=True)
pos = {"latitudeI": 1010000000, "longitudeI": 1020000000}
newpos = iface._fixupPosition(pos)
assert newpos == {"latitude": 101.0,
"latitudeI": 1010000000,
"longitude": 102.0,
"longitudeI": 1020000000}
@pytest.mark.unit
def test_nodeNumToId(capsys, reset_globals, iface_with_nodes):
"""Test _nodeNumToId()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
someid = iface._nodeNumToId(2475227164)
assert someid == '!9388f81c'
@pytest.mark.unit
def test_nodeNumToId_not_found(capsys, reset_globals, iface_with_nodes):
"""Test _nodeNumToId()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
someid = iface._nodeNumToId(123)
assert someid is None
@pytest.mark.unit
def test_nodeNumToId_to_all(capsys, reset_globals, iface_with_nodes):
"""Test _nodeNumToId()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
someid = iface._nodeNumToId(0xffffffff)
assert someid == '^all'
@pytest.mark.unit
def test_getOrCreateByNum_minimal(capsys, reset_globals, iface_with_nodes):
"""Test _getOrCreateByNum()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
tmp = iface._getOrCreateByNum(123)
assert tmp == {'num': 123}
@pytest.mark.unit
def test_getOrCreateByNum_not_found(capsys, reset_globals, iface_with_nodes):
"""Test _getOrCreateByNum()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
with pytest.raises(Exception) as pytest_wrapped_e:
iface._getOrCreateByNum(0xffffffff)
assert pytest_wrapped_e.type == Exception
@pytest.mark.unit
def test_getOrCreateByNum(capsys, reset_globals, iface_with_nodes):
"""Test _getOrCreateByNum()"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
tmp = iface._getOrCreateByNum(2475227164)
assert tmp['num'] == 2475227164
@pytest.mark.unit
def test_enter():
"""Test __enter__()"""
iface = MeshInterface(noProto=True)
assert iface == iface.__enter__()
@pytest.mark.unit
def test_exit_with_exception(caplog):
"""Test __exit__()"""
iface = MeshInterface(noProto=True)
with caplog.at_level(logging.ERROR):
iface.__exit__('foo', 'bar', 'baz')
assert re.search(r'An exception of type foo with value bar has occurred', caplog.text, re.MULTILINE)
assert re.search(r'Traceback: baz', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes):
"""Test that we hit that continue statement"""
with caplog.at_level(logging.DEBUG):
iface = iface_with_nodes
iface.localNode.nodeNum = 2475227164
iface.showNodes()
iface.showNodes(includeSelf=False)
capsys.readouterr()
@pytest.mark.unitslow
def test_waitForConfig(caplog, capsys):
"""Test waitForConfig()"""
iface = MeshInterface(noProto=True)
# override how long to wait
iface._timeout = Timeout(0.01)
with pytest.raises(Exception) as pytest_wrapped_e:
iface.waitForConfig()
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'Exception: Timed out waiting for interface config', err, re.MULTILINE)
assert out == ''
@pytest.mark.unit
def test_waitConnected_raises_an_exception(caplog, capsys):
"""Test waitConnected()"""
iface = MeshInterface(noProto=True)
with pytest.raises(Exception) as pytest_wrapped_e:
iface.failure = "warn about something"
iface._waitConnected(0.01)
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'warn about something', err, re.MULTILINE)
assert out == ''
@pytest.mark.unit
def test_waitConnected_isConnected_timeout(caplog, capsys):
"""Test waitConnected()"""
with pytest.raises(Exception) as pytest_wrapped_e:
iface = MeshInterface()
iface._waitConnected(0.01)
assert pytest_wrapped_e.type == Exception
out, err = capsys.readouterr()
assert re.search(r'warn about something', err, re.MULTILINE)
assert out == ''

View File

@@ -11,6 +11,7 @@ from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig
from ..util import Timeout
@pytest.mark.unit
@@ -29,7 +30,7 @@ def test_node(capsys):
@pytest.mark.unit
def test_node_reqquestConfig():
def test_node_requestConfig(capsys):
"""Test run requestConfig"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
@@ -37,6 +38,9 @@ def test_node_reqquestConfig():
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
anode = Node(mo, 'bar')
anode.requestConfig()
out, err = capsys.readouterr()
assert re.search(r'Requesting preferences from remote node', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@@ -87,6 +91,16 @@ def test_setOwner_no_short_name_and_long_name_has_words(caplog):
assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_setOwner_long_name_no_short(caplog):
"""Test setOwner"""
anode = Node('foo', 'bar', noProto=True)
with caplog.at_level(logging.DEBUG):
anode.setOwner(long_name ='Aabo', is_licensed=True)
assert re.search(r'p.set_owner.long_name:Aabo:', caplog.text, re.MULTILINE)
assert re.search(r'p.set_owner.short_name:Aab:', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_exitSimulator(caplog):
"""Test exitSimulator"""
@@ -106,13 +120,16 @@ def test_reboot(caplog):
@pytest.mark.unit
def test_setURL_empty_url():
def test_setURL_empty_url(capsys):
"""Test reboot"""
anode = Node('foo', 'bar', noProto=True)
with pytest.raises(SystemExit) as pytest_wrapped_e:
anode.setURL('')
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: No RadioConfig has been read', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@@ -133,7 +150,7 @@ def test_setURL_valid_URL(caplog):
@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(caplog):
def test_setURL_valid_URL_but_no_settings(caplog, capsys):
"""Test setURL"""
iface = MagicMock(autospec=SerialInterface)
url = "https://www.meshtastic.org/d/#"
@@ -143,6 +160,9 @@ def test_setURL_valid_URL_but_no_settings(caplog):
anode.setURL(url)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: There were no settings', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@@ -623,7 +643,7 @@ def test_writeConfig(caplog):
@pytest.mark.unit
def test_requestChannel_not_localNode(caplog):
def test_requestChannel_not_localNode(caplog, capsys):
"""Test _requestChannel()"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
@@ -633,6 +653,9 @@ def test_requestChannel_not_localNode(caplog):
with caplog.at_level(logging.DEBUG):
anode._requestChannel(0)
assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Requesting channel 0 info', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
@@ -857,3 +880,14 @@ def test_onResponseRequestSetting_with_error(capsys):
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''
@pytest.mark.unitslow
def test_waitForConfig():
"""Test waitForConfig()"""
anode = Node('foo', 'bar')
radioConfig = RadioConfig()
anode.radioConfig = radioConfig
anode._timeout = Timeout(0.01)
result = anode.waitForConfig()
assert not result

View File

@@ -79,7 +79,7 @@ def test_watchGPIOs(caplog):
@pytest.mark.unit
def test_sendHardware_no_nodeid():
def test_sendHardware_no_nodeid(capsys):
"""Test sending no nodeid to _sendHardware()"""
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
@@ -87,3 +87,6 @@ def test_sendHardware_no_nodeid():
rhw = RemoteHardwareClient(mo)
rhw._sendHardware(None, None)
assert pytest_wrapped_e.type == SystemExit
out, err = capsys.readouterr()
assert re.search(r'Warning: Must use a destination node ID', out)
assert err == ''

View File

@@ -3,15 +3,19 @@
import re
from unittest.mock import patch
from unittest.mock import patch, mock_open
import pytest
from ..serial_interface import SerialInterface
@pytest.mark.unit
@patch("time.sleep")
@patch("termios.tcsetattr")
@patch("termios.tcgetattr")
@patch("builtins.open", new_callable=mock_open, read_data="data")
@patch('serial.Serial')
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyUSBfake'])
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial):
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_sleep, capsys):
"""Test that we can instantiate a SerialInterface with a single port"""
iface = SerialInterface(noProto=True)
iface.showInfo()
@@ -19,6 +23,12 @@ def test_SerialInterface_single_port(mocked_findPorts, mocked_serial):
iface.close()
mocked_findPorts.assert_called()
mocked_serial.assert_called()
out, err = capsys.readouterr()
assert re.search(r'Nodes in mesh', out, re.MULTILINE)
assert re.search(r'Preferences', out, re.MULTILINE)
assert re.search(r'Channels', out, re.MULTILINE)
assert re.search(r'Primary channel', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit

View File

@@ -0,0 +1,706 @@
"""Meshtastic smoke tests with a single virtual device via localhost.
During the CI build of the Meshtastic-device, a build.zip file is created.
Inside that build.zip is a standalone executable meshtasticd_linux_amd64.
That linux executable will simulate a Meshtastic device listening on localhost.
This smoke test runs against that localhost.
"""
import re
import subprocess
import time
import platform
import os
# Do not like using hard coded sleeps, but it probably makes
# sense to pause for the radio at apprpriate times
import pytest
from ..util import findPorts
# seconds to pause after running a meshtastic command
PAUSE_AFTER_COMMAND = 0.1
PAUSE_AFTER_REBOOT = 0.1
#TODO: need to fix the virtual device to have a reboot. When you issue the command
# below, you get "FIXME implement reboot for this platform"
#@pytest.mark.smokevirt
#def test_smokevirt_reboot():
# """Test reboot"""
# return_value, _ = subprocess.getstatusoutput('meshtastic --host localhost --reboot')
# assert return_value == 0
# # pause for the radio to reset
# time.sleep(8)
@pytest.mark.smokevirt
def test_smokevirt_info():
"""Test --info"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Owner', out, re.MULTILINE)
assert re.search(r'^My info', out, re.MULTILINE)
assert re.search(r'^Nodes in mesh', out, re.MULTILINE)
assert re.search(r'^Preferences', out, re.MULTILINE)
assert re.search(r'^Channels', out, re.MULTILINE)
assert re.search(r'^ PRIMARY', out, re.MULTILINE)
assert re.search(r'^Primary channel URL', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_sendping():
"""Test --sendping"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --sendping')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Sending ping message', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_get_with_invalid_setting():
"""Test '--get a_bad_setting'."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get a_bad_setting')
assert re.search(r'Choices in sorted order', out)
assert return_value == 0
@pytest.mark.smokevirt
def test_set_with_invalid_setting():
"""Test '--set a_bad_setting'."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set a_bad_setting foo')
assert re.search(r'Choices in sorted order', out)
assert return_value == 0
@pytest.mark.smokevirt
def test_ch_set_with_invalid_settingpatch_find_ports():
"""Test '--ch-set with a_bad_setting'."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set invalid_setting foo --ch-index 0')
assert re.search(r'Choices in sorted order', out)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_pos_fields():
"""Test --pos-fields (with some values POS_ALTITUDE POS_ALT_MSL POS_BATTERY)"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --pos-fields POS_ALTITUDE POS_ALT_MSL POS_BATTERY')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting position fields to 35', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --pos-fields')
assert re.match(r'Connected to radio', out)
assert re.search(r'POS_ALTITUDE', out, re.MULTILINE)
assert re.search(r'POS_ALT_MSL', out, re.MULTILINE)
assert re.search(r'POS_BATTERY', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_test_with_arg_but_no_hardware():
"""Test --test
Note: Since only one device is connected, it will not do much.
"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --test')
assert re.search(r'^Warning: Must have at least two devices', out, re.MULTILINE)
assert return_value == 1
@pytest.mark.smokevirt
def test_smokevirt_debug():
"""Test --debug"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info --debug')
assert re.search(r'^Owner', out, re.MULTILINE)
assert re.search(r'^DEBUG file', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_seriallog_to_file():
"""Test --seriallog to a file creates a file"""
filename = 'tmpoutput.txt'
if os.path.exists(f"{filename}"):
os.remove(f"{filename}")
return_value, _ = subprocess.getstatusoutput(f'meshtastic --host localhost --info --seriallog {filename}')
assert os.path.exists(f"{filename}")
assert return_value == 0
os.remove(f"{filename}")
@pytest.mark.smokevirt
def test_smokevirt_qr():
"""Test --qr"""
filename = 'tmpqr'
if os.path.exists(f"{filename}"):
os.remove(f"{filename}")
return_value, _ = subprocess.getstatusoutput(f'meshtastic --host localhost --qr > {filename}')
assert os.path.exists(f"{filename}")
# not really testing that a valid qr code is created, just that the file size
# is reasonably big enough for a qr code
assert os.stat(f"{filename}").st_size > 20000
assert return_value == 0
os.remove(f"{filename}")
@pytest.mark.smokevirt
def test_smokevirt_nodes():
"""Test --nodes"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --nodes')
assert re.match(r'Connected to radio', out)
if platform.system() != 'Windows':
assert re.search(r' User ', out, re.MULTILINE)
assert re.search(r' 1 ', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_send_hello():
"""Test --sendtext hello"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --sendtext hello')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Sending text message hello to \^all', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_port():
"""Test --port"""
# first, get the ports
ports = findPorts()
# hopefully there is none
assert len(ports) == 0
@pytest.mark.smokevirt
def test_smokevirt_set_is_router_true():
"""Test --set is_router true"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set is_router true')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set is_router to true', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get is_router')
assert re.search(r'^is_router: True', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_location_info():
"""Test --setlat, --setlon and --setalt """
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --setlat 32.7767 --setlon -96.7970 --setalt 1337')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Fixing altitude', out, re.MULTILINE)
assert re.search(r'^Fixing latitude', out, re.MULTILINE)
assert re.search(r'^Fixing longitude', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out2 = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'1337', out2, re.MULTILINE)
assert re.search(r'32.7767', out2, re.MULTILINE)
assert re.search(r'-96.797', out2, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_is_router_false():
"""Test --set is_router false"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set is_router false')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set is_router to false', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get is_router')
assert re.search(r'^is_router: False', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_owner():
"""Test --set-owner name"""
# make sure the owner is not Joe
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-owner Bob')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting device owner to Bob', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'Owner: Joe', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-owner Joe')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting device owner to Joe', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'Owner: Joe', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_team():
"""Test --set-team """
# unset the team
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-team CLEAR')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Setting team to CLEAR', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-team CYAN')
assert re.search(r'Setting team to CYAN', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'CYAN', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_values():
"""Test --ch-longslow, --ch-longfast, --ch-mediumslow, --ch-mediumsfast,
--ch-shortslow, and --ch-shortfast arguments
"""
exp = {
'--ch-longslow': 'Bw125Cr48Sf4096',
'--ch-longfast': 'Bw31_25Cr48Sf512',
'--ch-mediumslow': 'Bw250Cr46Sf2048',
'--ch-mediumfast': 'Bw250Cr47Sf1024',
'--ch-shortslow': '{ "psk',
'--ch-shortfast': 'Bw500Cr45Sf128'
}
for key, val in exp.items():
return_value, out = subprocess.getstatusoutput(f'meshtastic --host localhost {key}')
assert re.match(r'Connected to radio', out)
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio (might reboot)
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(val, out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ch_set_name():
"""Test --ch-set name"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'MyChannel', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name MyChannel')
assert re.match(r'Connected to radio', out)
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name MyChannel --ch-index 0')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set name to MyChannel', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'MyChannel', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_set_downlink_and_uplink():
"""Test -ch-set downlink_enabled X and --ch-set uplink_enabled X"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled false --ch-set uplink_enabled false')
assert re.match(r'Connected to radio', out)
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# pylint: disable=C0301
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled false --ch-set uplink_enabled false --ch-index 0')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'uplinkEnabled', out, re.MULTILINE)
assert not re.search(r'downlinkEnabled', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# pylint: disable=C0301
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set downlink_enabled true --ch-set uplink_enabled true --ch-index 0')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set downlink_enabled to true', out, re.MULTILINE)
assert re.search(r'^Set uplink_enabled to true', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'uplinkEnabled', out, re.MULTILINE)
assert re.search(r'downlinkEnabled', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_add_and_ch_del():
"""Test --ch-add"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
# make sure the secondar channel is not there
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert not re.search(r'SECONDARY', out, re.MULTILINE)
assert not re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_ch_enable_and_disable():
"""Test --ch-enable and --ch-disable"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# ensure they need to specify a --ch-index
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable')
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'DISABLED', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-enable --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ch_del_a_disabled_non_primary_channel():
"""Test --ch-del will work on a disabled non-primary channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# ensure they need to specify a --ch-index
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable')
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert not re.search(r'DISABLED', out, re.MULTILINE)
assert not re.search(r'SECONDARY', out, re.MULTILINE)
assert not re.search(r'testing', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_attempt_to_delete_primary_channel():
"""Test that we cannot delete the PRIMARY channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 0')
assert re.search(r'Warning: Cannot delete primary channel', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_attempt_to_disable_primary_channel():
"""Test that we cannot disable the PRIMARY channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-disable --ch-index 0')
assert re.search(r'Warning: Cannot enable', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_attempt_to_enable_primary_channel():
"""Test that we cannot enable the PRIMARY channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-enable --ch-index 0')
assert re.search(r'Warning: Cannot enable', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ensure_ch_del_second_of_three_channels():
"""Test that when we delete the 2nd of 3 channels, that it deletes the correct channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing2')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing2', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing2', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ensure_ch_del_third_of_three_channels():
"""Test that when we delete the 3rd of 3 channels, that it deletes the correct channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'SECONDARY', out, re.MULTILINE)
assert re.search(r'testing1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing2')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing2', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 2')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert re.search(r'testing1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-del --ch-index 1')
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_ch_set_modem_config():
"""Test --ch-set modem_config"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set modem_config Bw31_25Cr48Sf512')
assert re.search(r'Warning: Need to specify', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set modem_config Bw31_25Cr48Sf512 --ch-index 0')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set modem_config to Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'Bw31_25Cr48Sf512', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_seturl_default():
"""Test --seturl with default value"""
# set some channel value so we no longer have a default channel
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-set name foo --ch-index 0')
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
# ensure we no longer have a default primary channel
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert not re.search('CgUYAyIBAQ', out, re.MULTILINE)
assert return_value == 0
url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
return_value, out = subprocess.getstatusoutput(f"meshtastic --host localhost --seturl {url}")
assert re.match(r'Connected to radio', out)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search('CgUYAyIBAQ', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_seturl_invalid_url():
"""Test --seturl with invalid url"""
# Note: This url is no longer a valid url.
url = "https://www.meshtastic.org/c/#GAMiENTxuzogKQdZ8Lz_q89Oab8qB0RlZmF1bHQ="
return_value, out = subprocess.getstatusoutput(f"meshtastic --host localhost --seturl {url}")
assert re.match(r'Connected to radio', out)
assert re.search('Warning: There were no settings', out, re.MULTILINE)
assert return_value == 1
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
@pytest.mark.smokevirt
def test_smokevirt_configure():
"""Test --configure"""
_ , out = subprocess.getstatusoutput(f"meshtastic --host localhost --configure example_config.yaml")
assert re.match(r'Connected to radio', out)
assert re.search('^Setting device owner to Bob TBeam', out, re.MULTILINE)
assert re.search('^Fixing altitude at 304 meters', out, re.MULTILINE)
assert re.search('^Fixing latitude at 35.8', out, re.MULTILINE)
assert re.search('^Fixing longitude at -93.8', out, re.MULTILINE)
assert re.search('^Setting device position', out, re.MULTILINE)
assert re.search('^Set region to 1', out, re.MULTILINE)
assert re.search('^Set is_always_powered to true', out, re.MULTILINE)
assert re.search('^Set send_owner_interval to 2', out, re.MULTILINE)
assert re.search('^Set screen_on_secs to 31536000', out, re.MULTILINE)
assert re.search('^Set wait_bluetooth_secs to 31536000', out, re.MULTILINE)
assert re.search('^Writing modified preferences to device', out, re.MULTILINE)
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
@pytest.mark.smokevirt
def test_smokevirt_set_ham():
"""Test --set-ham
Note: Do a factory reset after this setting so it is very short-lived.
"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set-ham KI1234')
assert re.search(r'Setting Ham ID', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.search(r'Owner: KI1234', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_set_wifi_settings():
"""Test --set wifi_ssid and --set wifi_password"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set wifi_ssid "some_ssid" --set wifi_password "temp1234"')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set wifi_ssid to some_ssid', out, re.MULTILINE)
assert re.search(r'^Set wifi_password to temp1234', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_COMMAND)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --get wifi_ssid --get wifi_password')
assert re.search(r'^wifi_ssid: some_ssid', out, re.MULTILINE)
assert re.search(r'^wifi_password: sekrit', out, re.MULTILINE)
assert return_value == 0
@pytest.mark.smokevirt
def test_smokevirt_factory_reset():
"""Test factory reset"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --set factory_reset true')
assert re.match(r'Connected to radio', out)
assert re.search(r'^Set factory_reset to true', out, re.MULTILINE)
assert re.search(r'^Writing modified preferences to device', out, re.MULTILINE)
assert return_value == 0
# NOTE: The virtual radio will not respond well after this command. Need to re-start the virtual program at this point.
# TODO: fix?

View File

@@ -34,9 +34,9 @@ def test_StreamInterface_with_noProto(caplog, reset_globals):
assert data == test_data
# Note: This takes a bit, so moving from unit to slow
# Tip: If you want to see the print output, run with '-s' flag:
# pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
## Note: This takes a bit, so moving from unit to slow
## Tip: If you want to see the print output, run with '-s' flag:
## pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
@pytest.mark.unitslow
def test_sendToRadioImpl(caplog, reset_globals):
"""Test _sendToRadioImpl()"""
@@ -78,4 +78,3 @@ def test_sendToRadioImpl(caplog, reset_globals):
assert re.search(r'Sending: ', caplog.text, re.MULTILINE)
assert re.search(r'reading character', caplog.text, re.MULTILINE)
assert re.search(r'In reader loop', caplog.text, re.MULTILINE)
print(caplog.text)

View File

@@ -13,6 +13,7 @@ def test_TCPInterface(capsys):
"""Test that we can instantiate a TCPInterface"""
with patch('socket.socket') as mock_socket:
iface = TCPInterface(hostname='localhost', noProto=True)
iface.myConnect()
iface.showInfo()
iface.localNode.showInfo()
out, err = capsys.readouterr()
@@ -24,3 +25,11 @@ def test_TCPInterface(capsys):
assert err == ''
assert mock_socket.called
iface.close()
@pytest.mark.unit
def test_TCPInterface_without_connecting(capsys):
"""Test that we can instantiate a TCPInterface with connectNow as false"""
with patch('socket.socket'):
iface = TCPInterface(hostname='localhost', noProto=True, connectNow=False)
assert iface.socket is None

View File

@@ -0,0 +1,266 @@
"""Meshtastic unit tests for tunnel.py"""
import re
import sys
import logging
from unittest.mock import patch, MagicMock
import pytest
from ..tcp_interface import TCPInterface
from ..tunnel import Tunnel, onTunnelReceive
from ..globals import Globals
@pytest.mark.unit
@patch('platform.system')
def test_Tunnel_on_non_linux_system(mock_platform_system, reset_globals):
"""Test that we cannot instantiate a Tunnel on a non Linux system"""
a_mock = MagicMock()
a_mock.return_value = 'notLinux'
mock_platform_system.side_effect = a_mock
with patch('socket.socket') as mock_socket:
with pytest.raises(Exception) as pytest_wrapped_e:
iface = TCPInterface(hostname='localhost', noProto=True)
Tunnel(iface)
assert pytest_wrapped_e.type == Exception
assert mock_socket.called
@pytest.mark.unit
@patch('platform.system')
def test_Tunnel_without_interface(mock_platform_system, reset_globals):
"""Test that we can not instantiate a Tunnel without a valid interface"""
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with pytest.raises(Exception) as pytest_wrapped_e:
Tunnel(None)
assert pytest_wrapped_e.type == Exception
@pytest.mark.unitslow
@patch('platform.system')
def test_Tunnel_with_interface(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test that we can not instantiate a Tunnel without a valid interface"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.WARNING):
with patch('socket.socket'):
tun = Tunnel(iface)
assert tun == Globals.getInstance().get_tunnelInstance()
iface.close()
assert re.search(r'Not creating a TapDevice()', caplog.text, re.MULTILINE)
assert re.search(r'Not starting TUN reader', caplog.text, re.MULTILINE)
assert re.search(r'Not sending packet', caplog.text, re.MULTILINE)
@pytest.mark.unitslow
@patch('platform.system')
def test_onTunnelReceive_from_ourselves(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test onTunnelReceive"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
sys.argv = ['']
Globals.getInstance().set_args(sys.argv)
packet = {'decoded': { 'payload': 'foo'}, 'from': 2475227164}
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
Globals.getInstance().set_tunnelInstance(tun)
onTunnelReceive(packet, iface)
assert re.search(r'in onTunnelReceive', caplog.text, re.MULTILINE)
assert re.search(r'Ignoring message we sent', caplog.text, re.MULTILINE)
@pytest.mark.unit
@patch('platform.system')
def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test onTunnelReceive"""
iface = iface_with_nodes
iface.myInfo.my_node_num = 2475227164
sys.argv = ['']
Globals.getInstance().set_args(sys.argv)
packet = {'decoded': { 'payload': 'foo'}, 'from': 123}
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
Globals.getInstance().set_tunnelInstance(tun)
onTunnelReceive(packet, iface)
assert re.search(r'in onTunnelReceive', caplog.text, re.MULTILINE)
@pytest.mark.unitslow
@patch('platform.system')
def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# random packet
packet = b'1234567890123456789012345678901234567890'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert not ignore
@pytest.mark.unitslow
@patch('platform.system')
def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# faked IGMP
packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert ignore
@pytest.mark.unitslow
@patch('platform.system')
def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# faked ICMP
packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert re.search(r'forwarding ICMP message', caplog.text, re.MULTILINE)
assert not ignore
@pytest.mark.unit
@patch('platform.system')
def test_shouldFilterPacket_udp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# faked UDP
packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert re.search(r'forwarding udp', caplog.text, re.MULTILINE)
assert not ignore
@pytest.mark.unitslow
@patch('platform.system')
def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# faked UDP
packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x6c\x07\x6c\x00\x00\x00'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
# Note: custom logging level
LOG_TRACE = 5
with caplog.at_level(LOG_TRACE):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert re.search(r'ignoring blacklisted UDP', caplog.text, re.MULTILINE)
assert ignore
@pytest.mark.unit
@patch('platform.system')
def test_shouldFilterPacket_tcp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# faked TCP
packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert re.search(r'forwarding tcp', caplog.text, re.MULTILINE)
assert not ignore
@pytest.mark.unitslow
@patch('platform.system')
def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _shouldFilterPacket()"""
iface = iface_with_nodes
iface.noProto = True
# faked TCP
packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17\x0c\x17\x0c\x00\x00\x00'
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
# Note: custom logging level
LOG_TRACE = 5
with caplog.at_level(LOG_TRACE):
with patch('socket.socket'):
tun = Tunnel(iface)
ignore = tun._shouldFilterPacket(packet)
assert re.search(r'ignoring blacklisted TCP', caplog.text, re.MULTILINE)
assert ignore
@pytest.mark.unitslow
@patch('platform.system')
def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _ipToNodeId()"""
iface = iface_with_nodes
iface.noProto = True
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
nodeid = tun._ipToNodeId('something not useful')
assert nodeid is None
@pytest.mark.unitslow
@patch('platform.system')
def test_ipToNodeId_all(mock_platform_system, caplog, reset_globals, iface_with_nodes):
"""Test _ipToNodeId()"""
iface = iface_with_nodes
iface.noProto = True
a_mock = MagicMock()
a_mock.return_value = 'Linux'
mock_platform_system.side_effect = a_mock
with caplog.at_level(logging.DEBUG):
with patch('socket.socket'):
tun = Tunnel(iface)
nodeid = tun._ipToNodeId(b'\x00\x00\xff\xff')
assert nodeid == '^all'

View File

@@ -3,12 +3,14 @@
import re
import logging
from unittest.mock import patch
import pytest
from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
support_info, genPSK256, fromStr, fromPSK,
quoteBooleans, catchAndIgnore,
remove_keys_from_dict)
remove_keys_from_dict, Timeout, hexstr,
ipstr, readnet_u16, findPorts, convert_mac_addr)
@pytest.mark.unit
@@ -39,7 +41,7 @@ def test_fromStr():
assert fromStr('abc') == 'abc'
@pytest.mark.unit
@pytest.mark.unitslow
def test_quoteBooleans():
"""Test quoteBooleans"""
assert quoteBooleans('') == ''
@@ -86,13 +88,13 @@ def test_pskToString_one_byte_zero_value():
assert pskToString(bytes([0x00])) == 'unencrypted'
@pytest.mark.unit
@pytest.mark.unitslow
def test_pskToString_one_byte_non_zero_value():
"""Test pskToString one byte that is non-zero"""
assert pskToString(bytes([0x01])) == 'default'
@pytest.mark.unit
@pytest.mark.unitslow
def test_pskToString_many_bytes():
"""Test pskToString many bytes"""
assert pskToString(bytes([0x02, 0x01])) == 'secret'
@@ -104,25 +106,31 @@ def test_pskToString_simple():
assert pskToString(bytes([0x03])) == 'simple2'
@pytest.mark.unit
def test_our_exit_zero_return_value():
@pytest.mark.unitslow
def test_our_exit_zero_return_value(capsys):
"""Test our_exit with a zero return value"""
with pytest.raises(SystemExit) as pytest_wrapped_e:
our_exit("Warning: Some message", 0)
out, err = capsys.readouterr()
assert re.search(r'Warning: Some message', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 0
@pytest.mark.unit
def test_our_exit_non_zero_return_value():
@pytest.mark.unitslow
def test_our_exit_non_zero_return_value(capsys):
"""Test our_exit with a non-zero return value"""
with pytest.raises(SystemExit) as pytest_wrapped_e:
our_exit("Error: Some message", 1)
out, err = capsys.readouterr()
assert re.search(r'Error: Some message', out, re.MULTILINE)
assert err == ''
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
@pytest.mark.unit
@pytest.mark.unitslow
def test_fixme():
"""Test fixme()"""
with pytest.raises(Exception) as pytest_wrapped_e:
@@ -152,13 +160,13 @@ def test_catchAndIgnore(caplog):
assert re.search(r'Exception thrown in something', caplog.text, re.MULTILINE)
@pytest.mark.unit
@pytest.mark.unitslow
def test_remove_keys_from_dict_empty_keys_empty_dict():
"""Test when keys and dict both are empty"""
assert not remove_keys_from_dict((), {})
@pytest.mark.unit
@pytest.mark.unitslow
def test_remove_keys_from_dict_empty_dict():
"""Test when dict is empty"""
assert not remove_keys_from_dict(('a'), {})
@@ -170,7 +178,74 @@ def test_remove_keys_from_dict_empty_keys():
assert remove_keys_from_dict((), {'a':1}) == {'a':1}
@pytest.mark.unit
@pytest.mark.unitslow
def test_remove_keys_from_dict():
"""Test remove_keys_from_dict()"""
assert remove_keys_from_dict(('b'), {'a':1, 'b':2}) == {'a':1}
@pytest.mark.unitslow
def test_remove_keys_from_dict_multiple_keys():
"""Test remove_keys_from_dict()"""
keys = ('a', 'b')
adict = {'a': 1, 'b': 2, 'c': 3}
assert remove_keys_from_dict(keys, adict) == {'c':3}
@pytest.mark.unit
def test_remove_keys_from_dict_nested():
"""Test remove_keys_from_dict()"""
keys = ('b')
adict = {'a': {'b': 1}, 'b': 2, 'c': 3}
exp = {'a': {}, 'c': 3}
assert remove_keys_from_dict(keys, adict) == exp
@pytest.mark.unitslow
def test_Timeout_not_found():
"""Test Timeout()"""
to = Timeout(0.2)
attrs = ('foo')
to.waitForSet('bar', attrs)
@pytest.mark.unitslow
def test_Timeout_found():
"""Test Timeout()"""
to = Timeout(0.2)
attrs = ()
to.waitForSet('bar', attrs)
@pytest.mark.unitslow
def test_hexstr():
"""Test hexstr()"""
assert hexstr(b'123') == '31:32:33'
assert hexstr(b'') == ''
@pytest.mark.unitslow
def test_ipstr():
"""Test ipstr()"""
assert ipstr(b'1234') == '49.50.51.52'
assert ipstr(b'') == ''
@pytest.mark.unitslow
def test_readnet_u16():
"""Test readnet_u16()"""
assert readnet_u16(b'123456', 2) == 13108
@pytest.mark.unitslow
@patch('serial.tools.list_ports.comports', return_value=[])
def test_findPorts_when_none_found(patch_comports):
"""Test findPorts()"""
assert not findPorts()
@pytest.mark.unitslow
def test_convert_mac_addr():
"""Test convert_mac_addr()"""
assert convert_mac_addr('/c0gFyhb') == 'fd:cd:20:17:28:5b'
assert convert_mac_addr('') == ''

View File

@@ -17,61 +17,28 @@
import logging
import threading
import platform
from pubsub import pub
from pytap2 import TapDevice
from . import portnums_pb2
# A new non standard log level that is lower level than DEBUG
LOG_TRACE = 5
# fixme - find a way to move onTunnelReceive inside of the class
tunnelInstance = None
"""A list of chatty UDP services we should never accidentally
forward to our slow network"""
udpBlacklist = {
1900, # SSDP
5353, # multicast DNS
}
"""A list of TCP services to block"""
tcpBlacklist = {}
"""A list of protocols we ignore"""
protocolBlacklist = {
0x02, # IGMP
0x80, # Service-Specific Connection-Oriented Protocol in a Multilink and Connectionless Environment
}
def hexstr(barray):
"""Print a string of hex digits"""
return ":".join('{:02x}'.format(x) for x in barray)
def ipstr(barray):
"""Print a string of ip digits"""
return ".".join('{}'.format(x) for x in barray)
def readnet_u16(p, offset):
"""Read big endian u16 (network byte order)"""
return p[offset] * 256 + p[offset + 1]
from meshtastic import portnums_pb2
from meshtastic.util import ipstr, readnet_u16
from meshtastic.globals import Globals
def onTunnelReceive(packet, interface):
"""Callback for received tunneled messages from mesh
FIXME figure out how to do closures with methods in python"""
"""Callback for received tunneled messages from mesh."""
logging.debug(f'in onTunnelReceive()')
our_globals = Globals.getInstance()
tunnelInstance = our_globals.get_tunnelInstance()
tunnelInstance.onReceive(packet)
class Tunnel:
"""A TUN based IP tunnel over meshtastic"""
def __init__(self, iface, subnet=None, netmask="255.255.0.0"):
def __init__(self, iface, subnet='10.115', netmask="255.255.0.0"):
"""
Constructor
@@ -79,35 +46,69 @@ class Tunnel:
subnet is used to construct our network number (normally 10.115.x.x)
"""
if subnet is None:
subnet = "10.115"
if not iface:
raise Exception("Tunnel() must have a interface")
self.iface = iface
self.subnetPrefix = subnet
global tunnelInstance
tunnelInstance = self
if platform.system() != 'Linux':
raise Exception("Tunnel() can only be run instantiated on a Linux system")
our_globals = Globals.getInstance()
our_globals.set_tunnelInstance(self)
"""A list of chatty UDP services we should never accidentally
forward to our slow network"""
self.udpBlacklist = {
1900, # SSDP
5353, # multicast DNS
}
"""A list of TCP services to block"""
self.tcpBlacklist = {
5900, # VNC (Note: Only adding for testing purposes.)
}
"""A list of protocols we ignore"""
self.protocolBlacklist = {
0x02, # IGMP
0x80, # Service-Specific Connection-Oriented Protocol in a Multilink and Connectionless Environment
}
# A new non standard log level that is lower level than DEBUG
self.LOG_TRACE = 5
# TODO: check if root?
logging.info("Starting IP to mesh tunnel (you must be root for this *pre-alpha* "\
"feature to work). Mesh members:")
pub.subscribe(onTunnelReceive, "meshtastic.receive.data.IP_TUNNEL_APP")
myAddr = self._nodeNumToIp(self.iface.myInfo.my_node_num)
for node in self.iface.nodes.values():
nodeId = node["user"]["id"]
ip = self._nodeNumToIp(node["num"])
logging.info(f"Node { nodeId } has IP address { ip }")
if self.iface.nodes:
for node in self.iface.nodes.values():
nodeId = node["user"]["id"]
ip = self._nodeNumToIp(node["num"])
logging.info(f"Node { nodeId } has IP address { ip }")
logging.debug("creating TUN device with MTU=200")
# FIXME - figure out real max MTU, it should be 240 - the overhead bytes for SubPacket and Data
self.tun = TapDevice(name="mesh")
self.tun.up()
self.tun.ifconfig(address=myAddr, netmask=netmask, mtu=200)
logging.debug(f"starting TUN reader, our IP address is {myAddr}")
self._rxThread = threading.Thread(
target=self.__tunReader, args=(), daemon=True)
self._rxThread.start()
self.tun = None
if self.iface.noProto:
logging.warning(f"Not creating a TapDevice() because it is disabled by noProto")
else:
self.tun = TapDevice(name="mesh")
self.tun.up()
self.tun.ifconfig(address=myAddr, netmask=netmask, mtu=200)
self._rxThread = None
if self.iface.noProto:
logging.warning(f"Not starting TUN reader because it is disabled by noProto")
else:
logging.debug(f"starting TUN reader, our IP address is {myAddr}")
self._rxThread = threading.Thread(target=self.__tunReader, args=(), daemon=True)
self._rxThread.start()
def onReceive(self, packet):
"""onReceive"""
@@ -115,12 +116,12 @@ class Tunnel:
if packet["from"] == self.iface.myInfo.my_node_num:
logging.debug("Ignoring message we sent")
else:
logging.debug(
f"Received mesh tunnel message type={type(p)} len={len(p)}")
logging.debug(f"Received mesh tunnel message type={type(p)} len={len(p)}")
# we don't really need to check for filtering here (sender should have checked),
# but this provides useful debug printing on types of packets received
if not self._shouldFilterPacket(p):
self.tun.write(p)
if not self.iface.noProto:
if not self._shouldFilterPacket(p):
self.tun.write(p)
def _shouldFilterPacket(self, p):
"""Given a packet, decode it and return true if it should be ignored"""
@@ -129,10 +130,9 @@ class Tunnel:
destAddr = p[16:20]
subheader = 20
ignore = False # Assume we will be forwarding the packet
if protocol in protocolBlacklist:
if protocol in self.protocolBlacklist:
ignore = True
logging.log(
LOG_TRACE, f"Ignoring blacklisted protocol 0x{protocol:02x}")
logging.log(self.LOG_TRACE, f"Ignoring blacklisted protocol 0x{protocol:02x}")
elif protocol == 0x01: # ICMP
icmpType = p[20]
icmpCode = p[21]
@@ -145,19 +145,17 @@ class Tunnel:
elif protocol == 0x11: # UDP
srcport = readnet_u16(p, subheader)
destport = readnet_u16(p, subheader + 2)
if destport in udpBlacklist:
if destport in self.udpBlacklist:
ignore = True
logging.log(
LOG_TRACE, f"ignoring blacklisted UDP port {destport}")
logging.log(self.LOG_TRACE, f"ignoring blacklisted UDP port {destport}")
else:
logging.debug(
f"forwarding udp srcport={srcport}, destport={destport}")
logging.debug(f"forwarding udp srcport={srcport}, destport={destport}")
elif protocol == 0x06: # TCP
srcport = readnet_u16(p, subheader)
destport = readnet_u16(p, subheader + 2)
if destport in tcpBlacklist:
if destport in self.tcpBlacklist:
ignore = True
logging.log(LOG_TRACE, f"ignoring blacklisted TCP port {destport}")
logging.log(self.LOG_TRACE, f"ignoring blacklisted TCP port {destport}")
else:
logging.debug(f"forwarding tcp srcport={srcport}, destport={destport}")
else:

View File

@@ -4,6 +4,7 @@ import traceback
from queue import Queue
import os
import sys
import base64
import time
import platform
import logging
@@ -169,8 +170,7 @@ class DeferredExecution():
o = self.queue.get()
o()
except:
logging.error(
f"Unexpected error in deferred execution {sys.exc_info()[0]}")
logging.error(f"Unexpected error in deferred execution {sys.exc_info()[0]}")
print(traceback.format_exc())
@@ -195,7 +195,8 @@ def support_info():
print(' Machine: {0}'.format(platform.uname().machine))
print(' Encoding (stdin): {0}'.format(sys.stdin.encoding))
print(' Encoding (stdout): {0}'.format(sys.stdout.encoding))
print(' meshtastic: v{0}'.format(pkg_resources.require('meshtastic')[0].version))
the_version = pkg_resources.get_distribution("meshtastic").version
print(' meshtastic: v{0}'.format(the_version))
print(' Executable: {0}'.format(sys.argv[0]))
print(' Python: {0} {1} {2}'.format(platform.python_version(),
platform.python_implementation(), platform.python_compiler()))
@@ -204,9 +205,39 @@ def support_info():
def remove_keys_from_dict(keys, adict):
"""Return a dictionary without some keys in it."""
newdict = adict
"""Return a dictionary without some keys in it.
Will removed nested keys.
"""
for key in keys:
if key in adict:
del newdict[key]
return newdict
try:
del adict[key]
except:
pass
for val in adict.values():
if isinstance(val, dict):
remove_keys_from_dict(keys, val)
return adict
def hexstr(barray):
"""Print a string of hex digits"""
return ":".join('{:02x}'.format(x) for x in barray)
def ipstr(barray):
"""Print a string of ip digits"""
return ".".join('{}'.format(x) for x in barray)
def readnet_u16(p, offset):
"""Read big endian u16 (network byte order)"""
return p[offset] * 256 + p[offset + 1]
def convert_mac_addr(val):
"""Convert the base 64 encoded value to a mac address
val - base64 encoded value (ex: '/c0gFyhb'))
returns: a string formatted like a mac address (ex: 'fd:cd:20:17:28:5b')
"""
val_as_bytes = base64.b64decode(val)
return hexstr(val_as_bytes)

2
proto

Submodule proto updated: 1d3b4806ab...18fc4cdb52

View File

@@ -1,11 +1,12 @@
[pytest]
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples"
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples and not smokevirt"
markers =
unit: marks tests as unit tests
unitslow: marks slow unit tests
int: marks tests as integration tests
smokevirt: marks tests as smoke tests against virtual device
smoke1: runs smoke tests on a single device connected via USB
smoke2: runs smoke tests on a two devices connected via USB
smokewifi: runs smoke test on an esp32 device setup with wifi

View File

@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work
setup(
name="meshtastic",
version="1.2.49",
version="1.2.53",
description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description,
long_description_content_type="text/markdown",
@@ -23,7 +23,11 @@ setup(
classifiers=[
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
packages=["meshtastic"],
include_package_data=True,

5
vercel.json Normal file
View File

@@ -0,0 +1,5 @@
{
"github": {
"silent": true
}
}