Compare commits

..

56 Commits

Author SHA1 Message Date
mkinney
7903a4720d Merge pull request #330 from mkinney/1.2-legacy
1.2 legacy
2022-04-27 12:04:37 -07:00
mkinney
19648e3418 Merge branch 'meshtastic:1.2-legacy' into 1.2-legacy 2022-04-27 12:01:18 -07:00
Mike Kinney
49ee6b5988 update protobufs for 1.2 2022-04-27 12:00:37 -07:00
github-actions
5ac9867a91 bump version 2022-04-27 18:37:44 +00:00
mkinney
efa9469602 Merge pull request #329 from mkinney/1.2-legacy
1.2 legacy
2022-04-27 11:34:28 -07:00
Mike Kinney
e2f38b640b remove epaper 2022-04-27 11:32:14 -07:00
Mike Kinney
60d7540950 add nano_g1 2022-04-27 11:31:19 -07:00
github-actions
5761c89818 bump version 2022-03-21 17:49:55 +00:00
mkinney
d205d8e9ba Merge pull request #309 from mkinney/wifi_password_check
add wifi min length check
2022-03-21 10:48:05 -07:00
Mike Kinney
384ad456ae add wifi min length check 2022-03-21 10:44:46 -07:00
github-actions
a99397468f bump version 2022-03-16 02:59:04 +00:00
mkinney
f31297194a Merge pull request #305 from mkinney/1.2-legacy
fix the vendor and product id for tlora v1
2022-03-15 19:53:29 -07:00
Mike Kinney
4dc3bea674 fix the vendor and product id for tlora v1 2022-03-15 19:52:45 -07:00
github-actions
95f5d77c47 bump version 2022-03-16 02:34:58 +00:00
mkinney
5034e2ca43 Merge pull request #304 from mkinney/1.2-legacy
fix some tlora entries
2022-03-15 19:33:47 -07:00
Mike Kinney
eb4f68fccc fix some tlora entries 2022-03-15 19:32:27 -07:00
github-actions
de03f8e1fb bump version 2022-03-08 18:57:44 +00:00
mkinney
56f3e8a893 Merge pull request #297 from mkinney/1.2-legacy
refactor util; add duplicate check
2022-03-08 10:38:37 -08:00
mkinney
fbe0c09909 Merge branch '1.2-legacy' into 1.2-legacy 2022-03-08 10:37:15 -08:00
Mike Kinney
20c65974e9 refactor code to util 2022-03-08 10:34:46 -08:00
Mike Kinney
96e42ac3f2 add duplicate check 2022-03-08 10:24:44 -08:00
github-actions
3912f5728a bump version 2022-03-08 06:40:00 +00:00
mkinney
c016176520 Update release.yml 2022-03-07 22:38:45 -08:00
Mike Kinney
e6999ba5ad keep sha ref and use it for subsequent checkouts 2022-03-07 22:38:27 -08:00
Mike Kinney
5590dbeb6f do not checkout the code again 2022-03-07 22:38:06 -08:00
mkinney
e8a2909173 Merge pull request #294 from mkinney/bug_when_ports_not_sorted
fix when ports are not sorted
2022-03-07 22:37:49 -08:00
github-actions
9214b2ffcc bump version 2022-03-02 00:20:31 +00:00
mkinney
cfb14d4b77 Merge pull request #287 from mkinney/changes_to_smokevirt
increase timeout; ensure secondary channel is deleted before trying t…
2022-03-01 16:19:14 -08:00
Mike Kinney
5965615e17 increase timeout; ensure secondary channel is deleted before trying to add it 2022-03-02 00:04:21 +00:00
Jm Casler
40eb7d8515 updating proto submodule to latest 2022-02-28 22:03:42 -08:00
Jm Casler
e2f36a9bea updating proto submodule to latest 2022-02-28 19:36:02 -08:00
Jm Casler
a0ba644488 updating proto submodule to latest 2022-02-27 10:01:44 -08:00
Jm Casler
2f67f344b7 updating proto submodule to latest 2022-02-27 09:57:16 -08:00
Jm Casler
8bb208aed1 updating proto submodule to latest 2022-02-27 09:52:34 -08:00
Jm Casler
48b145a592 updating proto submodule to latest 2022-02-27 01:00:38 -08:00
Jm Casler
65de9200fb updating proto submodule to latest 2022-02-27 00:48:57 -08:00
Jm Casler
e51d7a7a18 updating proto submodule to latest 2022-02-27 00:38:09 -08:00
Jm Casler
c4af50d63a updating proto submodule to latest 2022-02-26 21:22:33 -08:00
Jm Casler
933fe8953a updating proto submodule to latest 2022-02-26 21:09:20 -08:00
mkinney
66aa492d50 Merge pull request #283 from mkinney/add_cm
add cm
2022-02-25 23:44:13 -08:00
Mike Kinney
730934f520 add cm 2022-02-25 23:36:05 -08:00
mkinney
d2ec09eaf8 Merge pull request #281 from prampec/extend_canned_message_length
Canned message - Extend messages length
2022-02-25 22:58:57 -08:00
mkinney
7fd3b313b2 Update release.yml 2022-02-25 22:15:36 -08:00
github-actions
89f1549741 bump version 2022-02-26 06:11:26 +00:00
github-actions
e91015f5c8 bump version 2022-02-26 06:07:50 +00:00
mkinney
d9c3edfb12 Update release.yml 2022-02-25 22:06:27 -08:00
mkinney
cbf9696f47 Merge pull request #282 from mkinney/master
improve the release process
2022-02-25 22:04:48 -08:00
Mike Kinney
24e556b9a7 improve the release process 2022-02-25 22:01:21 -08:00
mkinney
eaf29512b6 Update standalone_readme.txt 2022-02-25 21:38:23 -08:00
Jm Casler
0c1e0ec375 updating proto submodule to latest 2022-02-22 17:10:04 -08:00
Balazs Kelemen
b56a054f50 Canned message - Extend messages length 2022-02-21 22:06:03 +01:00
Jm Casler
33ff4e36de updating proto submodule to latest 2022-02-20 01:32:53 -08:00
Jm Casler
bf879934e6 updating proto submodule to latest 2022-02-19 23:48:24 -08:00
Jm Casler
b878fa3a80 updating proto submodule to latest 2022-02-19 23:17:50 -08:00
Jm Casler
901849f176 updating proto submodule to latest 2022-02-19 23:10:53 -08:00
Jm Casler
371c0d22c2 updating proto submodule to latest 2022-02-19 22:55:38 -08:00
29 changed files with 1293 additions and 261 deletions

View File

@@ -1,66 +1,79 @@
name: Make Release name: Make Release
on: on: workflow_dispatch
workflow_dispatch:
inputs:
version:
description: "Release version (Example: 1.0.0, must match 'version' in setup.py)"
required: true
default: '1.0.0'
jobs: jobs:
release_create: release_create:
runs-on: ubuntu-latest runs-on: ubuntu-latest
outputs:
version: ${{ steps.get_version.outputs.version }}
upload_url: ${{ steps.create_release.outputs.upload_url }}
new_sha: ${{ steps.commit_updated.outputs.sha }}
steps: steps:
- name: Checkout
uses: actions/checkout@v2
- name: Bump version
run: >-
bin/bump_version.py
- name: Commit updated version.py
id: commit_updated
run: |
git config --global user.name 'github-actions'
git config --global user.email 'bot@noreply.github.com'
git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/${{ github.repository }}
git add setup.py
git commit -m "bump version" && git push || echo "No changes to commit"
git log -n 1 --pretty=format:"%H" | tail -n 1 | awk '{print "::set-output name=sha::"$0}'
- name: Get version
id: get_version
run: >-
bin/show_version.py
- name: Create GitHub release - name: Create GitHub release
uses: actions/create-release@v1 uses: actions/create-release@v1
id: create_release id: create_release
with: with:
draft: true draft: true
prerelease: true prerelease: true
release_name: ${{ github.event.inputs.version}} release_name: Meshtastic Python ${{ steps.get_version.outputs.version }}
tag_name: ${{ github.event.inputs.version}} tag_name: ${{ steps.get_version.outputs.version }}
body: | body: |
Autogenerated by github action, developer should edit as required before publishing... Autogenerated by github action, developer should edit as required before publishing...
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
outputs: - name: Set up Python 3.9
upload_url: ${{ steps.create_release.outputs.upload_url }} uses: actions/setup-python@v2
with:
python-version: 3.9
publish_to_pypi: - name: Install pypa/build
runs-on: ubuntu-latest run: >-
steps: python -m
- name: Checkout pip install
uses: actions/checkout@v2 build
--user
- name: Set up Python 3.9 - name: Build a binary wheel and a source tarball
uses: actions/setup-python@v2 run: >-
with: python -m
python-version: 3.9 build
--sdist
--wheel
--outdir dist/
.
- name: Install pypa/build - name: Publish to PyPI
run: >- uses: pypa/gh-action-pypi-publish@master
python -m with:
pip install user: __token__
build password: ${{ secrets.PYPI_API_TOKEN }}
--user
- name: Build a binary wheel and a source tarball
run: >-
python -m
build
--sdist
--wheel
--outdir dist/
.
- name: Publish to PyPI
uses: pypa/gh-action-pypi-publish@master
with:
user: __token__
password: ${{ secrets.PYPI_API_TOKEN }}
build-and-publish-mac: build-and-publish-mac:
@@ -69,7 +82,9 @@ jobs:
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v3
with:
ref: ${{ needs.release_create.outputs.new_sha }}
- name: Set up Python 3.9 - name: Set up Python 3.9
uses: actions/setup-python@v2 uses: actions/setup-python@v2
@@ -114,7 +129,9 @@ jobs:
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v3
with:
ref: ${{ needs.release_create.outputs.new_sha }}
- name: Set up Python 3.9 - name: Set up Python 3.9
uses: actions/setup-python@v2 uses: actions/setup-python@v2
@@ -137,7 +154,7 @@ jobs:
asset_path: dist/meshtastic asset_path: dist/meshtastic
asset_name: meshtastic_ubuntu asset_name: meshtastic_ubuntu
asset_content_type: application/zip asset_content_type: application/zip
- name: Add readme.txt to release - name: Add readme.txt to release
uses: actions/upload-release-asset@v1 uses: actions/upload-release-asset@v1
env: env:
@@ -146,7 +163,7 @@ jobs:
upload_url: ${{ needs.release_create.outputs.upload_url }} upload_url: ${{ needs.release_create.outputs.upload_url }}
asset_path: standalone_readme.txt asset_path: standalone_readme.txt
asset_name: readme.txt asset_name: readme.txt
asset_content_type: text/plain asset_content_type: text/plain
build-and-publish-windows: build-and-publish-windows:
runs-on: windows-latest runs-on: windows-latest
@@ -154,7 +171,9 @@ jobs:
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@v2 uses: actions/checkout@v3
with:
ref: ${{ needs.release_create.outputs.new_sha }}
- name: Set up Python 3.9 - name: Set up Python 3.9
uses: actions/setup-python@v2 uses: actions/setup-python@v2

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ dist
log_* log_*
.eggs .eggs
nanopb-0.4.4 nanopb-0.4.4
nanopb-0.4.5
.*swp .*swp
.coverage .coverage
*.py-E *.py-E

1
.gitmodules vendored
View File

@@ -1,3 +1,4 @@
[submodule "proto"] [submodule "proto"]
path = proto path = proto
url = https://github.com/meshtastic/Meshtastic-protobufs.git url = https://github.com/meshtastic/Meshtastic-protobufs.git
branch = 1.2-legacy

View File

@@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi
# no Warning level messages displayed, use"--disable=all --enable=classes # no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W" # --disable=W"
# #
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except,too-many-public-methods
[BASIC] [BASIC]
@@ -41,7 +41,7 @@ bad-names=foo,bar,baz,toto,tutu,tata
max-line-length=150 max-line-length=150
# Maximum number of lines in a module # Maximum number of lines in a module
max-module-lines=1200 max-module-lines=1400

View File

@@ -22,6 +22,12 @@ lint:
slow: slow:
pytest -m unit --durations=5 pytest -m unit --durations=5
proto: FORCE
git submodule update --init --recursive
git pull --rebase
git submodule update --remote --merge
./bin/regen-protos.sh
# run the coverage report and open results in a browser # run the coverage report and open results in a browser
cov: cov:
pytest --cov-report html --cov=meshtastic pytest --cov-report html --cov=meshtastic

25
bin/bump_version.py Executable file
View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
"""Bump the version number"""
version_filename = "setup.py"
lines = None
with open(version_filename, 'r', encoding='utf-8') as f:
lines = f.readlines()
with open(version_filename, 'w', encoding='utf-8') as f:
for line in lines:
if line.lstrip().startswith("version="):
# get rid of quotes around the version
line = line.replace('"', '')
# get rid of trailing comma
line = line.replace(",", "")
# split on '='
words = line.split("=")
# split the version into parts (by period)
v = words[1].split(".")
ver = f'{v[0]}.{v[1]}.{int(v[2]) + 1}'
f.write(f' version="{ver}",\n')
else:
f.write(line)

20
bin/show_version.py Executable file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python
"""Show the version number"""
version_filename = "setup.py"
lines = None
with open(version_filename, 'r', encoding='utf-8') as f:
lines = f.readlines()
for line in lines:
if line.lstrip().startswith("version="):
# get rid of quotes around the version
line2 = line.replace('"', '')
# get rid of the trailing comma
line2 = line2.replace(',', '')
# split on =
words = line2.split("=")
# Note: This format is for github actions
print(f'::set-output name=version::{words[1].strip()}')

View File

@@ -3,8 +3,7 @@
""" """
import sys import sys
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices from meshtastic.util import detect_supported_devices, get_unique_vendor_ids, active_ports_on_supported_devices
from meshtastic.util import detect_supported_devices
# simple arg check # simple arg check
if len(sys.argv) != 1: if len(sys.argv) != 1:

6
examples/show_ports.py Normal file
View File

@@ -0,0 +1,6 @@
"""Simple program to show serial ports.
"""
from meshtastic.util import findPorts
print(findPorts())

30
info/mac/nano_g1.txt Normal file
View File

@@ -0,0 +1,30 @@
meshtastic detected port: /dev/cu.wchusbserial53820208781
ioreg -p IOUSB
shows this:
| +-o USB Single Serial@14300000 <class AppleUSBDevice, id 0x1000407a5, registered, matched, active, busy 0 (18 ms), retain 14>
system_profiler SPUSBDataType > /tmp/a
with device plugged in
system_profiler SPUSBDataType > /tmp/b
with device not plugged in
diff /tmp/a /tmp/b
< USB Single Serial:
<
< Product ID: 0x55d4
< Vendor ID: 0x1a86
< Version: 4.43
< Serial Number: 5382020878
< Speed: Up to 12 Mb/s
< Location ID: 0x14300000 / 63
< Current Available (mA): 500
< Current Required (mA): 134
< Extra Operating Current (mA): 0

91
info/ubuntu/nano_g1.txt Normal file
View File

@@ -0,0 +1,91 @@
lsusb -d 1a86: -v
Bus 001 Device 013: ID 1a86:55d4 QinHeng Electronics
Couldn't open device, some information will be missing
Device Descriptor:
bLength 18
bDescriptorType 1
bcdUSB 1.10
bDeviceClass 2 Communications
bDeviceSubClass 0
bDeviceProtocol 0
bMaxPacketSize0 8
idVendor 0x1a86 QinHeng Electronics
idProduct 0x55d4
bcdDevice 4.43
iManufacturer 0
iProduct 2
iSerial 3
bNumConfigurations 1
Configuration Descriptor:
bLength 9
bDescriptorType 2
wTotalLength 0x0043
bNumInterfaces 2
bConfigurationValue 1
iConfiguration 0
bmAttributes 0xa0
(Bus Powered)
Remote Wakeup
MaxPower 134mA
Interface Descriptor:
bLength 9
bDescriptorType 4
bInterfaceNumber 0
bAlternateSetting 0
bNumEndpoints 1
bInterfaceClass 2 Communications
bInterfaceSubClass 2 Abstract (modem)
bInterfaceProtocol 1 AT-commands (v.25ter)
iInterface 0
CDC Header:
bcdCDC 1.10
CDC Call Management:
bmCapabilities 0x00
bDataInterface 1
CDC ACM:
bmCapabilities 0x02
line coding and serial state
CDC Union:
bMasterInterface 0
bSlaveInterface 1
Endpoint Descriptor:
bLength 7
bDescriptorType 5
bEndpointAddress 0x83 EP 3 IN
bmAttributes 3
Transfer Type Interrupt
Synch Type None
Usage Type Data
wMaxPacketSize 0x0010 1x 16 bytes
bInterval 1
Interface Descriptor:
bLength 9
bDescriptorType 4
bInterfaceNumber 1
bAlternateSetting 0
bNumEndpoints 2
bInterfaceClass 10 CDC Data
bInterfaceSubClass 0
bInterfaceProtocol 0
iInterface 0
Endpoint Descriptor:
bLength 7
bDescriptorType 5
bEndpointAddress 0x02 EP 2 OUT
bmAttributes 2
Transfer Type Bulk
Synch Type None
Usage Type Data
wMaxPacketSize 0x0020 1x 32 bytes
bInterval 0
Endpoint Descriptor:
bLength 7
bDescriptorType 5
bEndpointAddress 0x82 EP 2 IN
bmAttributes 2
Transfer Type Bulk
Synch Type None
Usage Type Data
wMaxPacketSize 0x0040 1x 64 bytes
bInterval 0

39
info/windows/nano_g1.txt Normal file
View File

@@ -0,0 +1,39 @@
Get-PnpDevice -PresentOnly | Format-List >a
Get-PnpDevice -PresentOnly | Format-List >b
Compare-Object (get-content a) (Get-Content b)
InputObject SideIndicator
----------- -------------
Caption : USB-Enhanced-SERIAL CH9102 (COM9) =>
Description : USB-Enhanced-SERIAL CH9102 =>
Name : USB-Enhanced-SERIAL CH9102 (COM9) =>
DeviceID : USB\VID_1A86&PID_55D4\5382020745 =>
PNPDeviceID : USB\VID_1A86&PID_55D4\5382020745 =>
ClassGuid : {4d36e978-e325-11ce-bfc1-08002be10318} =>
CompatibleID : {USB\Class_02&SubClass_02&Prot_01, USB\Class_02&SubClass_02, USB\Class_02} =>
HardwareID : {USB\VID_1A86&PID_55D4&REV_0443, USB\VID_1A86&PID_55D4} =>
Manufacturer : wch.cn =>
PNPClass : Ports =>
Service : CH343SER_A64 =>
Class : Ports =>
FriendlyName : USB-Enhanced-SERIAL CH9102 (COM9) =>
InstanceId : USB\VID_1A86&PID_55D4\5382020745 =>
InstallDate : =>
Status : OK =>
Availability : =>
ConfigManagerErrorCode : CM_PROB_NONE =>
ConfigManagerUserConfig : False =>
CreationClassName : Win32_PnPEntity =>
ErrorCleared : =>
ErrorDescription : =>
LastErrorCode : =>
PowerManagementCapabilities : =>
PowerManagementSupported : =>
StatusInfo : =>
SystemCreationClassName : Win32_ComputerSystem =>
SystemName : DESKTOP-FRFQN8H =>
Present : True =>
PSComputerName : =>
Problem : CM_PROB_NONE =>
ProblemDescription : =>
=>

View File

@@ -119,6 +119,10 @@ def setPref(attributes, name, valStr):
val = meshtastic.util.fromStr(valStr) val = meshtastic.util.fromStr(valStr)
logging.debug(f'valStr:{valStr} val:{val}') logging.debug(f'valStr:{valStr} val:{val}')
if snake_name == 'wifi_password' and len(valStr) < 8:
print(f"Warning: wifi_password must be 8 or more characters.")
return
enumType = field.enum_type enumType = field.enum_type
# pylint: disable=C0123 # pylint: disable=C0123
if enumType and type(val) == str: if enumType and type(val) == str:
@@ -200,6 +204,12 @@ def onConnected(interface):
print(f"Setting device owner short to {args.set_owner_short}") print(f"Setting device owner short to {args.set_owner_short}")
interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short) interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short)
# TODO: add to export-config and configure
if args.set_canned_message:
closeNow = True
print(f"Setting canned plugin message to {args.set_canned_message}")
interface.getNode(args.dest).set_canned_message(args.set_canned_message)
if args.pos_fields: if args.pos_fields:
# If --pos-fields invoked with args, set position fields # If --pos-fields invoked with args, set position fields
closeNow = True closeNow = True
@@ -510,6 +520,11 @@ def onConnected(interface):
print(f"Writing modified channels to device") print(f"Writing modified channels to device")
interface.getNode(args.dest).writeChannel(channelIndex) interface.getNode(args.dest).writeChannel(channelIndex)
if args.get_canned_message:
closeNow = True
print("")
interface.getNode(args.dest).get_canned_message()
if args.info: if args.info:
print("") print("")
# If we aren't trying to talk to our local node, don't show it # If we aren't trying to talk to our local node, don't show it
@@ -746,6 +761,9 @@ def initParser():
parser.add_argument("--info", help="Read and display the radio config information", parser.add_argument("--info", help="Read and display the radio config information",
action="store_true") action="store_true")
parser.add_argument("--get-canned-message", help="Show the canned message plugin message",
action="store_true")
parser.add_argument("--nodes", help="Print Node List in a pretty formatted table", parser.add_argument("--nodes", help="Print Node List in a pretty formatted table",
action="store_true") action="store_true")
@@ -808,6 +826,9 @@ def initParser():
parser.add_argument( parser.add_argument(
"--set-owner", help="Set device owner name", action="store") "--set-owner", help="Set device owner name", action="store")
parser.add_argument(
"--set-canned-message", help="Set the canned messages plugin message (up to 1000 characters).", action="store")
parser.add_argument( parser.add_argument(
"--set-owner-short", help="Set device owner short name", action="store") "--set-owner-short", help="Set device owner short name", action="store")

View File

@@ -12,13 +12,12 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default() _sym_db = _symbol_database.Default()
from . import cannedmessages_pb2 as cannedmessages__pb2
from . import channel_pb2 as channel__pb2 from . import channel_pb2 as channel__pb2
from . import mesh_pb2 as mesh__pb2 from . import mesh_pb2 as mesh__pb2
from . import radioconfig_pb2 as radioconfig__pb2 from . import radioconfig_pb2 as radioconfig__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61\x64min.proto\x1a\x14\x63\x61nnedmessages.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\x87\x0c\n\x0c\x41\x64minMessage\x12!\n\tset_radio\x18\x01 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1a\n\tset_owner\x18\x02 \x01(\x0b\x32\x05.UserH\x00\x12\x1f\n\x0bset_channel\x18\x03 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_radio_request\x18\x04 \x01(\x08H\x00\x12*\n\x12get_radio_response\x18\x05 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1d\n\x13get_channel_request\x18\x06 \x01(\rH\x00\x12(\n\x14get_channel_response\x18\x07 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_owner_request\x18\x08 \x01(\x08H\x00\x12#\n\x12get_owner_response\x18\t \x01(\x0b\x32\x05.UserH\x00\x12\x1d\n\x13\x63onfirm_set_channel\x18 \x01(\x08H\x00\x12\x1b\n\x11\x63onfirm_set_radio\x18! \x01(\x08H\x00\x12\x18\n\x0e\x65xit_simulator\x18\" \x01(\x08H\x00\x12\x18\n\x0ereboot_seconds\x18# \x01(\x05H\x00\x12\x31\n\'get_canned_message_plugin_part1_request\x18$ \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part1_response\x18% \x01(\x0b\x32 .CannedMessagePluginMessagePart1H\x00\x12\x31\n\'get_canned_message_plugin_part2_request\x18& \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part2_response\x18\' \x01(\x0b\x32 .CannedMessagePluginMessagePart2H\x00\x12\x31\n\'get_canned_message_plugin_part3_request\x18( \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part3_response\x18) \x01(\x0b\x32 .CannedMessagePluginMessagePart3H\x00\x12\x31\n\'get_canned_message_plugin_part4_request\x18* \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part4_response\x18+ \x01(\x0b\x32 .CannedMessagePluginMessagePart4H\x00\x12\x31\n\'get_canned_message_plugin_part5_request\x18, \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part5_response\x18- \x01(\x0b\x32 .CannedMessagePluginMessagePart5H\x00\x12K\n\x1fset_canned_message_plugin_part1\x18. \x01(\x0b\x32 .CannedMessagePluginMessagePart1H\x00\x12K\n\x1fset_canned_message_plugin_part2\x18/ \x01(\x0b\x32 .CannedMessagePluginMessagePart2H\x00\x12K\n\x1fset_canned_message_plugin_part3\x18\x30 \x01(\x0b\x32 .CannedMessagePluginMessagePart3H\x00\x12K\n\x1fset_canned_message_plugin_part4\x18\x31 \x01(\x0b\x32 .CannedMessagePluginMessagePart4H\x00\x12K\n\x1fset_canned_message_plugin_part5\x18\x32 \x01(\x0b\x32 .CannedMessagePluginMessagePart5H\x00\x12\x1a\n\x10shutdown_seconds\x18\x33 \x01(\x05H\x00\x42\t\n\x07variantBG\n\x13\x63om.geeksville.meshB\x0b\x41\x64minProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61\x64min.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\xa1\x08\n\x0c\x41\x64minMessage\x12!\n\tset_radio\x18\x01 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1a\n\tset_owner\x18\x02 \x01(\x0b\x32\x05.UserH\x00\x12\x1f\n\x0bset_channel\x18\x03 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_radio_request\x18\x04 \x01(\x08H\x00\x12*\n\x12get_radio_response\x18\x05 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1d\n\x13get_channel_request\x18\x06 \x01(\rH\x00\x12(\n\x14get_channel_response\x18\x07 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_owner_request\x18\x08 \x01(\x08H\x00\x12#\n\x12get_owner_response\x18\t \x01(\x0b\x32\x05.UserH\x00\x12\x1d\n\x13\x63onfirm_set_channel\x18 \x01(\x08H\x00\x12\x1b\n\x11\x63onfirm_set_radio\x18! \x01(\x08H\x00\x12\x18\n\x0e\x65xit_simulator\x18\" \x01(\x08H\x00\x12\x18\n\x0ereboot_seconds\x18# \x01(\x05H\x00\x12\x31\n\'get_canned_message_plugin_part1_request\x18$ \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part1_response\x18% \x01(\tH\x00\x12\x31\n\'get_canned_message_plugin_part2_request\x18& \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part2_response\x18\' \x01(\tH\x00\x12\x31\n\'get_canned_message_plugin_part3_request\x18( \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part3_response\x18) \x01(\tH\x00\x12\x31\n\'get_canned_message_plugin_part4_request\x18* \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part4_response\x18+ \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part1\x18, \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part2\x18- \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part3\x18. \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part4\x18/ \x01(\tH\x00\x12\x1a\n\x10shutdown_seconds\x18\x33 \x01(\x05H\x00\x42\t\n\x07variantBG\n\x13\x63om.geeksville.meshB\x0b\x41\x64minProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
@@ -34,6 +33,6 @@ if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\023com.geeksville.meshB\013AdminProtosH\003Z!github.com/meshtastic/gomeshproto' DESCRIPTOR._serialized_options = b'\n\023com.geeksville.meshB\013AdminProtosH\003Z!github.com/meshtastic/gomeshproto'
_ADMINMESSAGE._serialized_start=84 _ADMINMESSAGE._serialized_start=62
_ADMINMESSAGE._serialized_end=1627 _ADMINMESSAGE._serialized_end=1119
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@@ -14,62 +14,22 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14\x63\x61nnedmessages.proto\"/\n\x1f\x43\x61nnedMessagePluginMessagePart1\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart2\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart3\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart4\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart5\x12\x0c\n\x04text\x18\x01 \x01(\tBU\n\x13\x63om.geeksville.meshB\x19\x43\x61nnedMessageConfigProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14\x63\x61nnedmessages.proto\"w\n\x19\x43\x61nnedMessagePluginConfig\x12\x15\n\rmessagesPart1\x18\x0b \x01(\t\x12\x15\n\rmessagesPart2\x18\x0c \x01(\t\x12\x15\n\rmessagesPart3\x18\r \x01(\t\x12\x15\n\rmessagesPart4\x18\x0e \x01(\tBU\n\x13\x63om.geeksville.meshB\x19\x43\x61nnedMessageConfigProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
_CANNEDMESSAGEPLUGINMESSAGEPART1 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart1'] _CANNEDMESSAGEPLUGINCONFIG = DESCRIPTOR.message_types_by_name['CannedMessagePluginConfig']
_CANNEDMESSAGEPLUGINMESSAGEPART2 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart2'] CannedMessagePluginConfig = _reflection.GeneratedProtocolMessageType('CannedMessagePluginConfig', (_message.Message,), {
_CANNEDMESSAGEPLUGINMESSAGEPART3 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart3'] 'DESCRIPTOR' : _CANNEDMESSAGEPLUGINCONFIG,
_CANNEDMESSAGEPLUGINMESSAGEPART4 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart4']
_CANNEDMESSAGEPLUGINMESSAGEPART5 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart5']
CannedMessagePluginMessagePart1 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart1', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART1,
'__module__' : 'cannedmessages_pb2' '__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart1) # @@protoc_insertion_point(class_scope:CannedMessagePluginConfig)
}) })
_sym_db.RegisterMessage(CannedMessagePluginMessagePart1) _sym_db.RegisterMessage(CannedMessagePluginConfig)
CannedMessagePluginMessagePart2 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart2', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART2,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart2)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart2)
CannedMessagePluginMessagePart3 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart3', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART3,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart3)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart3)
CannedMessagePluginMessagePart4 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart4', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART4,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart4)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart4)
CannedMessagePluginMessagePart5 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart5', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART5,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart5)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart5)
if _descriptor._USE_C_DESCRIPTORS == False: if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\023com.geeksville.meshB\031CannedMessageConfigProtosH\003Z!github.com/meshtastic/gomeshproto' DESCRIPTOR._serialized_options = b'\n\023com.geeksville.meshB\031CannedMessageConfigProtosH\003Z!github.com/meshtastic/gomeshproto'
_CANNEDMESSAGEPLUGINMESSAGEPART1._serialized_start=24 _CANNEDMESSAGEPLUGINCONFIG._serialized_start=24
_CANNEDMESSAGEPLUGINMESSAGEPART1._serialized_end=71 _CANNEDMESSAGEPLUGINCONFIG._serialized_end=143
_CANNEDMESSAGEPLUGINMESSAGEPART2._serialized_start=73
_CANNEDMESSAGEPLUGINMESSAGEPART2._serialized_end=120
_CANNEDMESSAGEPLUGINMESSAGEPART3._serialized_start=122
_CANNEDMESSAGEPLUGINMESSAGEPART3._serialized_end=169
_CANNEDMESSAGEPLUGINMESSAGEPART4._serialized_start=171
_CANNEDMESSAGEPLUGINMESSAGEPART4._serialized_end=218
_CANNEDMESSAGEPLUGINMESSAGEPART5._serialized_start=220
_CANNEDMESSAGEPLUGINMESSAGEPART5._serialized_end=267
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@@ -17,7 +17,7 @@ from . import mesh_pb2 as mesh__pb2
from . import radioconfig_pb2 as radioconfig__pb2 from . import radioconfig_pb2 as radioconfig__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65viceonly.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\x80\x01\n\x11LegacyRadioConfig\x12\x39\n\x0bpreferences\x18\x01 \x01(\x0b\x32$.LegacyRadioConfig.LegacyPreferences\x1a\x30\n\x11LegacyPreferences\x12\x1b\n\x06region\x18\x0f \x01(\x0e\x32\x0b.RegionCode\"\xf0\x03\n\x0b\x44\x65viceState\x12\'\n\x0blegacyRadio\x18\x01 \x01(\x0b\x32\x12.LegacyRadioConfig\x12\x1c\n\x07my_node\x18\x02 \x01(\x0b\x32\x0b.MyNodeInfo\x12\x14\n\x05owner\x18\x03 \x01(\x0b\x32\x05.User\x12\x1a\n\x07node_db\x18\x04 \x03(\x0b\x32\t.NodeInfo\x12\"\n\rreceive_queue\x18\x05 \x03(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07version\x18\x08 \x01(\r\x12$\n\x0frx_text_message\x18\x07 \x01(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07no_save\x18\t \x01(\x08\x12\x15\n\rdid_gps_reset\x18\x0b \x01(\x08\x12+\n#canned_message_plugin_message_part1\x18\r \x01(\t\x12+\n#canned_message_plugin_message_part2\x18\x0e \x01(\t\x12+\n#canned_message_plugin_message_part3\x18\x0f \x01(\t\x12+\n#canned_message_plugin_message_part4\x18\x10 \x01(\t\x12+\n#canned_message_plugin_message_part5\x18\x11 \x01(\tJ\x04\x08\x0c\x10\r\")\n\x0b\x43hannelFile\x12\x1a\n\x08\x63hannels\x18\x01 \x03(\x0b\x32\x08.ChannelBF\n\x13\x63om.geeksville.meshB\nDeviceOnlyH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3') DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65viceonly.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\x80\x01\n\x11LegacyRadioConfig\x12\x39\n\x0bpreferences\x18\x01 \x01(\x0b\x32$.LegacyRadioConfig.LegacyPreferences\x1a\x30\n\x11LegacyPreferences\x12\x1b\n\x06region\x18\x0f \x01(\x0e\x32\x0b.RegionCode\"\x8f\x02\n\x0b\x44\x65viceState\x12\'\n\x0blegacyRadio\x18\x01 \x01(\x0b\x32\x12.LegacyRadioConfig\x12\x1c\n\x07my_node\x18\x02 \x01(\x0b\x32\x0b.MyNodeInfo\x12\x14\n\x05owner\x18\x03 \x01(\x0b\x32\x05.User\x12\x1a\n\x07node_db\x18\x04 \x03(\x0b\x32\t.NodeInfo\x12\"\n\rreceive_queue\x18\x05 \x03(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07version\x18\x08 \x01(\r\x12$\n\x0frx_text_message\x18\x07 \x01(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07no_save\x18\t \x01(\x08\x12\x15\n\rdid_gps_reset\x18\x0b \x01(\x08J\x04\x08\x0c\x10\r\")\n\x0b\x43hannelFile\x12\x1a\n\x08\x63hannels\x18\x01 \x03(\x0b\x32\x08.ChannelBF\n\x13\x63om.geeksville.meshB\nDeviceOnlyH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
@@ -63,7 +63,7 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_LEGACYRADIOCONFIG_LEGACYPREFERENCES._serialized_start=147 _LEGACYRADIOCONFIG_LEGACYPREFERENCES._serialized_start=147
_LEGACYRADIOCONFIG_LEGACYPREFERENCES._serialized_end=195 _LEGACYRADIOCONFIG_LEGACYPREFERENCES._serialized_end=195
_DEVICESTATE._serialized_start=198 _DEVICESTATE._serialized_start=198
_DEVICESTATE._serialized_end=694 _DEVICESTATE._serialized_end=469
_CHANNELFILE._serialized_start=696 _CHANNELFILE._serialized_start=471
_CHANNELFILE._serialized_end=737 _CHANNELFILE._serialized_end=512
# @@protoc_insertion_point(module_scope) # @@protoc_insertion_point(module_scope)

View File

@@ -254,11 +254,12 @@ class MeshInterface:
meshPacket.decoded.payload = data meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse meshPacket.decoded.want_response = wantResponse
meshPacket.id = self._generatePacketId()
if onResponse is not None:
self._addResponseHandler(meshPacket.id, onResponse)
p = self._sendPacket(meshPacket, destinationId, p = self._sendPacket(meshPacket, destinationId,
wantAck=wantAck, hopLimit=hopLimit) wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0, def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0,
@@ -631,11 +632,13 @@ class MeshInterface:
# asObj = DotMap(asDict) # asObj = DotMap(asDict)
topic = "meshtastic.receive" # Generic unknown packet type topic = "meshtastic.receive" # Generic unknown packet type
decoded = asDict["decoded"] decoded = None
# The default MessageToDict converts byte arrays into base64 strings. if 'decoded' in asDict:
# We don't want that - it messes up data payload. So slam in the correct decoded = asDict["decoded"]
# byte array. # The default MessageToDict converts byte arrays into base64 strings.
decoded["payload"] = meshPacket.decoded.payload # We don't want that - it messes up data payload. So slam in the correct
# byte array.
decoded["payload"] = meshPacket.decoded.payload
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not # UNKNOWN_APP is the default protobuf portnum value, and therefore if not
# set it will not be populated at all to make API usage easier, set # set it will not be populated at all to make API usage easier, set

View File

File diff suppressed because one or more lines are too long

View File

@@ -3,6 +3,7 @@
import logging import logging
import base64 import base64
import time
from google.protobuf.json_format import MessageToJson from google.protobuf.json_format import MessageToJson
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2 from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
@@ -24,6 +25,15 @@ class Node:
self.partialChannels = None self.partialChannels = None
self.noProto = noProto self.noProto = noProto
self.cannedPluginMessage = None
self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None
self.gotResponse = None
def showChannels(self): def showChannels(self):
"""Show human readable description of our channels.""" """Show human readable description of our channels."""
print("Channels:") print("Channels:")
@@ -56,6 +66,14 @@ class Node:
self.channels = None self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished self.partialChannels = [] # We keep our channels in a temp array until finished
# Note: We do not get the canned plugin message, unless get_canned_message() is called
self.cannedPluginMessage = None
self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None
self._requestSettings() self._requestSettings()
def turnOffEncryptionOnPrimaryChannel(self): def turnOffEncryptionOnPrimaryChannel(self):
@@ -64,9 +82,9 @@ class Node:
print("Writing modified channels to device") print("Writing modified channels to device")
self.writeChannel(0) self.writeChannel(0)
def waitForConfig(self): def waitForConfig(self, attribute='channels'):
"""Block until radio config is received. Returns True if config has been received.""" """Block until radio config is received. Returns True if config has been received."""
return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels')) return self._timeout.waitForSet(self, attrs=('radioConfig', attribute))
def writeConfig(self): def writeConfig(self):
"""Write the current (edited) radioConfig to the device""" """Write the current (edited) radioConfig to the device"""
@@ -237,7 +255,7 @@ class Node:
"""Handle the response packet for requesting settings _requestSettings()""" """Handle the response packet for requesting settings _requestSettings()"""
logging.debug(f'onResponseRequestSetting() p:{p}') logging.debug(f'onResponseRequestSetting() p:{p}')
errorFound = False errorFound = False
if 'routing' in p["decoded"]: if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE": if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}') print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
@@ -268,6 +286,156 @@ class Node:
return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings) return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)
def onResponseRequestCannedMessagePluginMessagePart1(self, p):
"""Handle the response packet for requesting canned message plugin message part 1"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart1() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart1 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part1_response
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart2(self, p):
"""Handle the response packet for requesting canned message plugin message part 2"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart2() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart2 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part2_response
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart3(self, p):
"""Handle the response packet for requesting canned message plugin message part 3"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart3() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart3 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part3_response
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart4(self, p):
"""Handle the response packet for requesting canned message plugin message part 4"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart4() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart4 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part4_response
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.gotResponse = True
def get_canned_message(self):
"""Get the canned message string. Concatenate all pieces together and return a single string."""
logging.debug(f'in get_canned_message()')
if not self.cannedPluginMessage:
p1 = admin_pb2.AdminMessage()
p1.get_canned_message_plugin_part1_request = True
self.gotResponse = False
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart1)
while self.gotResponse is False:
time.sleep(0.1)
p2 = admin_pb2.AdminMessage()
p2.get_canned_message_plugin_part2_request = True
self.gotResponse = False
self._sendAdmin(p2, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart2)
while self.gotResponse is False:
time.sleep(0.1)
p3 = admin_pb2.AdminMessage()
p3.get_canned_message_plugin_part3_request = True
self.gotResponse = False
self._sendAdmin(p3, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart3)
while self.gotResponse is False:
time.sleep(0.1)
p4 = admin_pb2.AdminMessage()
p4.get_canned_message_plugin_part4_request = True
self.gotResponse = False
self._sendAdmin(p4, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart4)
while self.gotResponse is False:
time.sleep(0.1)
# TODO: This feels wrong to have a sleep here. Is there a way to ensure that
# all requests are complete? Perhaps change to a while loop any parts are None... maybe?
time.sleep(3)
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.cannedPluginMessage = ""
if self.cannedPluginMessagePart1:
self.cannedPluginMessage += self.cannedPluginMessagePart1
if self.cannedPluginMessagePart2:
self.cannedPluginMessage += self.cannedPluginMessagePart2
if self.cannedPluginMessagePart3:
self.cannedPluginMessage += self.cannedPluginMessagePart3
if self.cannedPluginMessagePart4:
self.cannedPluginMessage += self.cannedPluginMessagePart4
print(f'canned_plugin_message:{self.cannedPluginMessage}')
logging.debug(f'canned_plugin_message:{self.cannedPluginMessage}')
return self.cannedPluginMessage
def set_canned_message(self, message):
"""Set the canned message. Split into parts of 200 chars each."""
if len(message) > 800:
our_exit("Warning: The canned message must be less than 800 characters.")
# split into chunks
chunks = []
chunks_size = 200
for i in range(0, len(message), chunks_size):
chunks.append(message[i: i + chunks_size])
# for each chunk, send a message to set the values
#for i in range(0, len(chunks)):
for i, chunk in enumerate(chunks):
p = admin_pb2.AdminMessage()
# TODO: should be a way to improve this
if i == 0:
p.set_canned_message_plugin_part1 = chunk
elif i == 1:
p.set_canned_message_plugin_part2 = chunk
elif i == 2:
p.set_canned_message_plugin_part3 = chunk
elif i == 3:
p.set_canned_message_plugin_part4 = chunk
logging.debug(f"Setting canned message '{chunk}' part {i+1}")
self._sendAdmin(p)
def exitSimulator(self): def exitSimulator(self):
"""Tell a simulator node to exit (this message """Tell a simulator node to exit (this message
is ignored for other nodes)""" is ignored for other nodes)"""

View File

File diff suppressed because one or more lines are too long

View File

@@ -2,10 +2,6 @@
It is used for auto detection as to which device might be connected. It is used for auto detection as to which device might be connected.
""" """
import platform
import subprocess
import re
# Goal is to detect which device and port to use from the supported devices # Goal is to detect which device and port to use from the supported devices
# without installing any libraries that are not currently in the python meshtastic library # without installing any libraries that are not currently in the python meshtastic library
@@ -42,13 +38,13 @@ tbeam_M8N = SupportedDevice(name="T-Beam", version="M8N", for_firmware="tbeam",
tbeam_M8N_SX1262 = SupportedDevice(name="T-Beam", version="M8N_SX1262", for_firmware="tbeam", tbeam_M8N_SX1262 = SupportedDevice(name="T-Beam", version="M8N_SX1262", for_firmware="tbeam",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem", baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4") usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
tlora_v1_1 = SupportedDevice(name="T-Lora", version="1.1", for_firmware="tlora-v1", tlora_v1 = SupportedDevice(name="T-Lora", version="1", for_firmware="tlora-v1",
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial", baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial",
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60") usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
tlora_v1_3 = SupportedDevice(name="T-Lora", version="1.3", for_firmware="tlora-v1-3", tlora_v1_3 = SupportedDevice(name="T-Lora", version="1.3", for_firmware="tlora-v1-3",
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial", baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial",
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60") usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
tlora_v2_0 = SupportedDevice(name="T-Lora", version="2.0", for_firmware="tlora-v2-1", tlora_v2 = SupportedDevice(name="T-Lora", version="2", for_firmware="tlora-v2",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem", baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4") usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
tlora_v2_1 = SupportedDevice(name="T-Lora", version="2.1", for_firmware="tlora-v2-1", tlora_v2_1 = SupportedDevice(name="T-Lora", version="2.1", for_firmware="tlora-v2-1",
@@ -85,112 +81,12 @@ rak4631_19003 = SupportedDevice(name="RAK 4631 19003", version="", for_firmware=
device_class="nrf52", device_class="nrf52",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem", baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="239a", usb_product_id_in_hex="8029") usb_vendor_id_in_hex="239a", usb_product_id_in_hex="8029")
nano_g1 = SupportedDevice(name="Nano G1", version="", for_firmware="nano-g1",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
supported_devices = [tbeam_v0_7, tbeam_v1_1, tbeam_M8N, tbeam_M8N_SX1262, supported_devices = [tbeam_v0_7, tbeam_v1_1, tbeam_M8N, tbeam_M8N_SX1262,
tlora_v1_1, tlora_v1_3, tlora_v2_0, tlora_v2_1, tlora_v2_1_1_6, tlora_v1, tlora_v1_3, tlora_v2, tlora_v2_1, tlora_v2_1_1_6,
heltec_v1, heltec_v2_0, heltec_v2_1, heltec_v1, heltec_v2_0, heltec_v2_1,
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003, meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
rak11200] rak11200, nano_g1]
def get_unique_vendor_ids():
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex:
vids.add(d.usb_vendor_id_in_hex)
return vids
def get_devices_with_vendor_id(vid):
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex == vid:
sd.add(d)
return sd
def active_ports_on_supported_devices(sds):
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
# figure out what possible base ports there are
for d in sds:
if system == "Linux":
baseports.add(d.baseport_on_linux)
elif system == "Darwin":
baseports.add(d.baseport_on_mac)
elif system == "Windows":
baseports.add(d.baseport_on_windows)
for bp in baseports:
if system == "Linux":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Darwin":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Windows":
# for each device in supported devices found
for d in sds:
# find the port(s)
com_ports = detect_windows_port(d)
#print(f'com_ports:{com_ports}')
# add all ports
for com_port in com_ports:
ports.add(com_port)
return ports
def detect_windows_port(sd):
"""detect if Windows port"""
ports = set()
if sd:
system = platform.system()
if system == "Windows":
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
command += ')} | Format-List"'
#print(f'command:{command}')
_, sp_output = subprocess.getstatusoutput(command)
#print(f'sp_output:{sp_output}')
p = re.compile(r'\(COM(.*)\)')
for x in p.findall(sp_output):
#print(f'x:{x}')
ports.add(f'COM{x}')
return ports

View File

@@ -442,6 +442,43 @@ def test_main_set_owner_short_to_bob(capsys):
mo.assert_called() mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_canned_messages(capsys):
"""Test --set-canned-message """
sys.argv = ['', '--set-canned-message', 'foo']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Setting canned plugin message to foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_get_canned_messages(capsys, caplog, iface_with_nodes):
"""Test --get-canned-message """
sys.argv = ['', '--get-canned-message']
Globals.getInstance().set_args(sys.argv)
iface = iface_with_nodes
iface.localNode.cannedPluginMessage = 'foo'
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'canned_plugin_message:foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.usefixtures("reset_globals") @pytest.mark.usefixtures("reset_globals")
def test_main_set_ham_to_KI123(capsys): def test_main_set_ham_to_KI123(capsys):
@@ -865,6 +902,27 @@ def test_main_set_valid_wifi_passwd(capsys):
mo.assert_called() mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_invalid_wifi_passwd(capsys):
"""Test --set with an invalid value (password must be 8 or more characters)"""
sys.argv = ['', '--set', 'wifi_password', '1234567']
Globals.getInstance().set_args(sys.argv)
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert not re.search(r'Set wifi_password to 1234567', out, re.MULTILINE)
assert re.search(r'Warning: wifi_password must be 8 or more characters.', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit @pytest.mark.unit
@pytest.mark.usefixtures("reset_globals") @pytest.mark.usefixtures("reset_globals")
def test_main_set_valid_camel_case(capsys): def test_main_set_valid_camel_case(capsys):

View File

@@ -11,6 +11,9 @@ from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig from ..radioconfig_pb2 import RadioConfig
#from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
# CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
# CannedMessagePluginMessagePart5)
from ..util import Timeout from ..util import Timeout
@@ -43,6 +46,122 @@ def test_node_requestConfig(capsys):
assert err == '' assert err == ''
#@pytest.mark.unit
#def test_node_get_canned_message_with_all_parts(capsys):
# """Test run get_canned_message()"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# # we have a sleep in this method, so override it so it goes fast
# with patch('time.sleep'):
# anode = Node(mo, 'bar')
# anode.cannedPluginMessagePart1 = 'a'
# anode.cannedPluginMessagePart2 = 'b'
# anode.cannedPluginMessagePart3 = 'c'
# anode.cannedPluginMessagePart4 = 'd'
# anode.cannedPluginMessagePart5 = 'e'
# anode.get_canned_message()
# out, err = capsys.readouterr()
# assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_node_get_canned_message_with_some_parts(capsys):
# """Test run get_canned_message()"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# # we have a sleep in this method, so override it so it goes fast
# with patch('time.sleep'):
# anode = Node(mo, 'bar')
# anode.cannedPluginMessagePart1 = 'a'
# anode.get_canned_message()
# out, err = capsys.readouterr()
# assert re.search(r'canned_plugin_message:a', out, re.MULTILINE)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_one_part(caplog):
# """Test run set_canned_message()"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# anode.set_canned_message('foo')
# assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE)
# assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_200(caplog):
# """Test run set_canned_message() 200 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# message_200_chars_long = 'a' * 200
# anode.set_canned_message(message_200_chars_long)
# assert re.search(r" part 1", caplog.text, re.MULTILINE)
# assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_201(caplog):
# """Test run set_canned_message() 201 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# message_201_chars_long = 'a' * 201
# anode.set_canned_message(message_201_chars_long)
# assert re.search(r" part 1", caplog.text, re.MULTILINE)
# assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_1000(caplog):
# """Test run set_canned_message() 1000 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# message_1000_chars_long = 'a' * 1000
# anode.set_canned_message(message_1000_chars_long)
# assert re.search(r" part 1", caplog.text, re.MULTILINE)
# assert re.search(r" part 2", caplog.text, re.MULTILINE)
# assert re.search(r" part 3", caplog.text, re.MULTILINE)
# assert re.search(r" part 4", caplog.text, re.MULTILINE)
# assert re.search(r" part 5", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_1001(capsys):
# """Test run set_canned_message() 1001 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# with pytest.raises(SystemExit) as pytest_wrapped_e:
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar')
# message_1001_chars_long = 'a' * 1001
# anode.set_canned_message(message_1001_chars_long)
# assert pytest_wrapped_e.type == SystemExit
# assert pytest_wrapped_e.value.code == 1
# out, err = capsys.readouterr()
# assert re.search(r'Warning: The canned message', out, re.MULTILINE)
# assert err == ''
@pytest.mark.unit @pytest.mark.unit
def test_setOwner_and_team(caplog): def test_setOwner_and_team(caplog):
"""Test setOwner""" """Test setOwner"""
@@ -694,6 +813,352 @@ def test_requestChannel_localNode(caplog):
assert not re.search(r'from remote node', caplog.text, re.MULTILINE) assert not re.search(r'from remote node', caplog.text, re.MULTILINE)
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart1()"""
#
# part1 = CannedMessagePluginMessagePart1()
# part1.text = 'foo1'
#
# msg1 = MagicMock(autospec=AdminMessage)
# msg1.get_canned_message_plugin_part1_response = part1
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart1Response': {'text': 'foo1'},
# 'raw': msg1
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart1 == 'foo1'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart2(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart2()"""
#
# part2 = CannedMessagePluginMessagePart2()
# part2.text = 'foo2'
#
# msg2 = MagicMock(autospec=AdminMessage)
# msg2.get_canned_message_plugin_part2_response = part2
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart2Response': {'text': 'foo2'},
# 'raw': msg2
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart2 == 'foo2'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart3(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart3()"""
#
# part3 = CannedMessagePluginMessagePart3()
# part3.text = 'foo3'
#
# msg3 = MagicMock(autospec=AdminMessage)
# msg3.get_canned_message_plugin_part3_response = part3
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart3Response': {'text': 'foo3'},
# 'raw': msg3
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart3 == 'foo3'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart4(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart4()"""
#
# part4 = CannedMessagePluginMessagePart4()
# part4.text = 'foo4'
#
# msg4 = MagicMock(autospec=AdminMessage)
# msg4.get_canned_message_plugin_part4_response = part4
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart4Response': {'text': 'foo4'},
# 'raw': msg4
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart4 == 'foo4'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart5(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart5()"""
#
# part5 = CannedMessagePluginMessagePart5()
# part5.text = 'foo5'
#
# msg5 = MagicMock(autospec=AdminMessage)
# msg5.get_canned_message_plugin_part5_response = part5
#
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart5Response': {'text': 'foo5'},
# 'raw': msg5
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart5 == 'foo5'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart1() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart2() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart3() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart4() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart5() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
@pytest.mark.unit @pytest.mark.unit
def test_onResponseRequestChannel(caplog): def test_onResponseRequestChannel(caplog):
"""Test onResponseRequestChannel()""" """Test onResponseRequestChannel()"""

View File

@@ -21,7 +21,7 @@ from ..util import findPorts
# seconds to pause after running a meshtastic command # seconds to pause after running a meshtastic command
PAUSE_AFTER_COMMAND = 0.1 PAUSE_AFTER_COMMAND = 0.1
PAUSE_AFTER_REBOOT = 0.1 PAUSE_AFTER_REBOOT = 0.2
#TODO: need to fix the virtual device to have a reboot. When you issue the command #TODO: need to fix the virtual device to have a reboot. When you issue the command
@@ -358,6 +358,11 @@ def test_smokevirt_ch_set_downlink_and_uplink():
@pytest.mark.smokevirt @pytest.mark.smokevirt
def test_smokevirt_ch_add_and_ch_del(): def test_smokevirt_ch_add_and_ch_del():
"""Test --ch-add""" """Test --ch-add"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing') return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE) assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0 assert return_value == 0
@@ -375,7 +380,7 @@ def test_smokevirt_ch_add_and_ch_del():
assert return_value == 0 assert return_value == 0
# pause for the radio # pause for the radio
time.sleep(PAUSE_AFTER_REBOOT) time.sleep(PAUSE_AFTER_REBOOT)
# make sure the secondar channel is not there # make sure the secondary channel is not there
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info') return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out) assert re.match(r'Connected to radio', out)
assert not re.search(r'SECONDARY', out, re.MULTILINE) assert not re.search(r'SECONDARY', out, re.MULTILINE)
@@ -386,6 +391,11 @@ def test_smokevirt_ch_add_and_ch_del():
@pytest.mark.smokevirt @pytest.mark.smokevirt
def test_smokevirt_ch_enable_and_disable(): def test_smokevirt_ch_enable_and_disable():
"""Test --ch-enable and --ch-disable""" """Test --ch-enable and --ch-disable"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing') return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE) assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0 assert return_value == 0
@@ -434,6 +444,11 @@ def test_smokevirt_ch_enable_and_disable():
@pytest.mark.smokevirt @pytest.mark.smokevirt
def test_smokevirt_ch_del_a_disabled_non_primary_channel(): def test_smokevirt_ch_del_a_disabled_non_primary_channel():
"""Test --ch-del will work on a disabled non-primary channel.""" """Test --ch-del will work on a disabled non-primary channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing') return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE) assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0 assert return_value == 0

View File

@@ -12,7 +12,9 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
remove_keys_from_dict, Timeout, hexstr, remove_keys_from_dict, Timeout, hexstr,
ipstr, readnet_u16, findPorts, convert_mac_addr, ipstr, readnet_u16, findPorts, convert_mac_addr,
snake_to_camel, camel_to_snake, eliminate_duplicate_port, snake_to_camel, camel_to_snake, eliminate_duplicate_port,
is_windows11) is_windows11, active_ports_on_supported_devices)
from meshtastic.supported_device import SupportedDevice
@pytest.mark.unit @pytest.mark.unit
@@ -264,6 +266,22 @@ def test_findPorts_when_duplicate_found_and_duplicate_option_used(patch_comports
patch_comports.assert_called() patch_comports.assert_called()
@pytest.mark.unitslow
@patch('serial.tools.list_ports.comports')
def test_findPorts_when_duplicate_found_and_duplicate_option_used_ports_reversed(patch_comports):
"""Test findPorts()"""
class TempPort:
""" temp class for port"""
def __init__(self, device=None, vid=None):
self.device = device
self.vid = vid
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
patch_comports.return_value = [fake2, fake1]
assert findPorts(eliminate_duplicates=True) == ['/dev/cu.wchusbserial1430']
patch_comports.assert_called()
@pytest.mark.unitslow @pytest.mark.unitslow
@patch('serial.tools.list_ports.comports') @patch('serial.tools.list_ports.comports')
def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports): def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports):
@@ -315,8 +333,13 @@ def test_eliminate_duplicate_port():
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1']) == ['/dev/fake', '/dev/fake1'] assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1']) == ['/dev/fake', '/dev/fake1']
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1', '/dev/fake2']) == ['/dev/fake', '/dev/fake1', '/dev/fake2'] assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1', '/dev/fake2']) == ['/dev/fake', '/dev/fake1', '/dev/fake2']
assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430'] assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430']
assert eliminate_duplicate_port(['/dev/cu.wchusbserial1430', '/dev/cu.usbserial-1430']) == ['/dev/cu.wchusbserial1430']
assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001'] assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001']
assert eliminate_duplicate_port(['/dev/cu.usbserial-0001', '/dev/cu.SLAB_USBtoUART']) == ['/dev/cu.usbserial-0001']
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301'] assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
assert eliminate_duplicate_port(['/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441']) == ['/dev/cu.wchusbserial53230051441']
assert eliminate_duplicate_port(['/dev/cu.wchusbserial53230051441', '/dev/cu.usbmodem53230051441']) == ['/dev/cu.wchusbserial53230051441']
assert eliminate_duplicate_port(['/dev/cu.wchusbserial11301', '/dev/cu.usbmodem11301']) == ['/dev/cu.wchusbserial11301']
@patch('platform.version', return_value='10.0.22000.194') @patch('platform.version', return_value='10.0.22000.194')
@patch('platform.release', return_value='10') @patch('platform.release', return_value='10')
@@ -358,3 +381,78 @@ def test_is_windows11_false_win8_1(patched_platform, patched_release):
assert is_windows11() is False assert is_windows11() is False
patched_platform.assert_called() patched_platform.assert_called()
patched_release.assert_called() patched_release.assert_called()
@pytest.mark.unit
@patch('platform.system', return_value='Linux')
def test_active_ports_on_supported_devices_empty(mock_platform):
"""Test active_ports_on_supported_devices()"""
sds = set()
assert active_ports_on_supported_devices(sds) == set()
mock_platform.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Linux')
def test_active_ports_on_supported_devices_linux(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/ttyUSBfake')
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='ttyUSB')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/ttyUSBfake'}
mock_platform.assert_called()
mock_sp.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Darwin')
def test_active_ports_on_supported_devices_mac(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/cu.usbserial-foo')
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='cu.usbserial-')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/cu.usbserial-foo'}
mock_platform.assert_called()
mock_sp.assert_called()
@pytest.mark.unit
@patch('meshtastic.util.detect_windows_port', return_value={'COM2'})
@patch('platform.system', return_value='Windows')
def test_active_ports_on_supported_devices_win(mock_platform, mock_dwp):
"""Test active_ports_on_supported_devices()"""
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices) == {'COM2'}
mock_platform.assert_called()
mock_dwp.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Darwin')
def test_active_ports_on_supported_devices_mac_no_duplicates_check(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices, False) == {'/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441'}
mock_platform.assert_called()
mock_sp.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Darwin')
def test_active_ports_on_supported_devices_mac_duplicates_check(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices, True) == {'/dev/cu.wchusbserial53230051441'}
mock_platform.assert_called()
mock_sp.assert_called()

View File

@@ -14,7 +14,8 @@ import subprocess
import serial import serial
import serial.tools.list_ports import serial.tools.list_ports
import pkg_resources import pkg_resources
from meshtastic.supported_device import get_unique_vendor_ids, get_devices_with_vendor_id
from meshtastic.supported_device import supported_devices
"""Some devices such as a seger jlink we never want to accidentally open""" """Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366]) blacklistVids = dict.fromkeys([0x1366])
@@ -401,6 +402,7 @@ def eliminate_duplicate_port(ports):
if len(ports) != 2: if len(ports) != 2:
new_ports = ports new_ports = ports
else: else:
ports.sort()
if 'usbserial' in ports[0] and 'wchusbserial' in ports[1]: if 'usbserial' in ports[0] and 'wchusbserial' in ports[1]:
first = ports[0].replace("usbserial-", "") first = ports[0].replace("usbserial-", "")
second = ports[1].replace("wchusbserial", "") second = ports[1].replace("wchusbserial", "")
@@ -432,3 +434,112 @@ def is_windows11():
except Exception as e: except Exception as e:
print(f'problem detecting win11 e:{e}') print(f'problem detecting win11 e:{e}')
return is_win11 return is_win11
def get_unique_vendor_ids():
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex:
vids.add(d.usb_vendor_id_in_hex)
return vids
def get_devices_with_vendor_id(vid):
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex == vid:
sd.add(d)
return sd
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
# figure out what possible base ports there are
for d in sds:
if system == "Linux":
baseports.add(d.baseport_on_linux)
elif system == "Darwin":
baseports.add(d.baseport_on_mac)
elif system == "Windows":
baseports.add(d.baseport_on_windows)
for bp in baseports:
if system == "Linux":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Darwin":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Windows":
# for each device in supported devices found
for d in sds:
# find the port(s)
com_ports = detect_windows_port(d)
#print(f'com_ports:{com_ports}')
# add all ports
for com_port in com_ports:
ports.add(com_port)
if eliminate_duplicates:
ports = eliminate_duplicate_port(list(ports))
ports.sort()
ports = set(ports)
return ports
def detect_windows_port(sd):
"""detect if Windows port"""
ports = set()
if sd:
system = platform.system()
if system == "Windows":
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
command += ')} | Format-List"'
#print(f'command:{command}')
_, sp_output = subprocess.getstatusoutput(command)
#print(f'sp_output:{sp_output}')
p = re.compile(r'\(COM(.*)\)')
for x in p.findall(sp_output):
#print(f'x:{x}')
ports.add(f'COM{x}')
return ports

2
proto

Submodule proto updated: 6a66f8b1f8...c851209e0b

View File

@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work # This call to setup() does all the work
setup( setup(
name="meshtastic", name="meshtastic",
version="1.2.85", version="1.2.94",
description="Python API & client shell for talking to Meshtastic devices", description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description, long_description=long_description,
long_description_content_type="text/markdown", long_description_content_type="text/markdown",

View File

@@ -2,6 +2,6 @@ readme.txt for single standalone executable zip files that can be
downloaded from https://github.com/meshtastic/Meshtastic-python/releases downloaded from https://github.com/meshtastic/Meshtastic-python/releases
If you do not want to install python and/or the python libraries, you can download one of these If you do not want to install python and/or the python libraries, you can download one of these
zip files to run the Meshtastic command line interface (CLI) as a standalone executable. files to run the Meshtastic command line interface (CLI) as a standalone executable.
See https://meshtastic.org/docs/software/python/python-standalone for more info. See https://meshtastic.org/docs/software/python/python-standalone for more info.