Compare commits

...

45 Commits

Author SHA1 Message Date
mkinney
7903a4720d Merge pull request #330 from mkinney/1.2-legacy
1.2 legacy
2022-04-27 12:04:37 -07:00
mkinney
19648e3418 Merge branch 'meshtastic:1.2-legacy' into 1.2-legacy 2022-04-27 12:01:18 -07:00
Mike Kinney
49ee6b5988 update protobufs for 1.2 2022-04-27 12:00:37 -07:00
github-actions
5ac9867a91 bump version 2022-04-27 18:37:44 +00:00
mkinney
efa9469602 Merge pull request #329 from mkinney/1.2-legacy
1.2 legacy
2022-04-27 11:34:28 -07:00
Mike Kinney
e2f38b640b remove epaper 2022-04-27 11:32:14 -07:00
Mike Kinney
60d7540950 add nano_g1 2022-04-27 11:31:19 -07:00
github-actions
5761c89818 bump version 2022-03-21 17:49:55 +00:00
mkinney
d205d8e9ba Merge pull request #309 from mkinney/wifi_password_check
add wifi min length check
2022-03-21 10:48:05 -07:00
Mike Kinney
384ad456ae add wifi min length check 2022-03-21 10:44:46 -07:00
github-actions
a99397468f bump version 2022-03-16 02:59:04 +00:00
mkinney
f31297194a Merge pull request #305 from mkinney/1.2-legacy
fix the vendor and product id for tlora v1
2022-03-15 19:53:29 -07:00
Mike Kinney
4dc3bea674 fix the vendor and product id for tlora v1 2022-03-15 19:52:45 -07:00
github-actions
95f5d77c47 bump version 2022-03-16 02:34:58 +00:00
mkinney
5034e2ca43 Merge pull request #304 from mkinney/1.2-legacy
fix some tlora entries
2022-03-15 19:33:47 -07:00
Mike Kinney
eb4f68fccc fix some tlora entries 2022-03-15 19:32:27 -07:00
github-actions
de03f8e1fb bump version 2022-03-08 18:57:44 +00:00
mkinney
56f3e8a893 Merge pull request #297 from mkinney/1.2-legacy
refactor util; add duplicate check
2022-03-08 10:38:37 -08:00
mkinney
fbe0c09909 Merge branch '1.2-legacy' into 1.2-legacy 2022-03-08 10:37:15 -08:00
Mike Kinney
20c65974e9 refactor code to util 2022-03-08 10:34:46 -08:00
Mike Kinney
96e42ac3f2 add duplicate check 2022-03-08 10:24:44 -08:00
github-actions
3912f5728a bump version 2022-03-08 06:40:00 +00:00
mkinney
c016176520 Update release.yml 2022-03-07 22:38:45 -08:00
Mike Kinney
e6999ba5ad keep sha ref and use it for subsequent checkouts 2022-03-07 22:38:27 -08:00
Mike Kinney
5590dbeb6f do not checkout the code again 2022-03-07 22:38:06 -08:00
mkinney
e8a2909173 Merge pull request #294 from mkinney/bug_when_ports_not_sorted
fix when ports are not sorted
2022-03-07 22:37:49 -08:00
github-actions
9214b2ffcc bump version 2022-03-02 00:20:31 +00:00
mkinney
cfb14d4b77 Merge pull request #287 from mkinney/changes_to_smokevirt
increase timeout; ensure secondary channel is deleted before trying t…
2022-03-01 16:19:14 -08:00
Mike Kinney
5965615e17 increase timeout; ensure secondary channel is deleted before trying to add it 2022-03-02 00:04:21 +00:00
Jm Casler
40eb7d8515 updating proto submodule to latest 2022-02-28 22:03:42 -08:00
Jm Casler
e2f36a9bea updating proto submodule to latest 2022-02-28 19:36:02 -08:00
Jm Casler
a0ba644488 updating proto submodule to latest 2022-02-27 10:01:44 -08:00
Jm Casler
2f67f344b7 updating proto submodule to latest 2022-02-27 09:57:16 -08:00
Jm Casler
8bb208aed1 updating proto submodule to latest 2022-02-27 09:52:34 -08:00
Jm Casler
48b145a592 updating proto submodule to latest 2022-02-27 01:00:38 -08:00
Jm Casler
65de9200fb updating proto submodule to latest 2022-02-27 00:48:57 -08:00
Jm Casler
e51d7a7a18 updating proto submodule to latest 2022-02-27 00:38:09 -08:00
Jm Casler
c4af50d63a updating proto submodule to latest 2022-02-26 21:22:33 -08:00
Jm Casler
933fe8953a updating proto submodule to latest 2022-02-26 21:09:20 -08:00
mkinney
66aa492d50 Merge pull request #283 from mkinney/add_cm
add cm
2022-02-25 23:44:13 -08:00
Mike Kinney
730934f520 add cm 2022-02-25 23:36:05 -08:00
mkinney
d2ec09eaf8 Merge pull request #281 from prampec/extend_canned_message_length
Canned message - Extend messages length
2022-02-25 22:58:57 -08:00
mkinney
7fd3b313b2 Update release.yml 2022-02-25 22:15:36 -08:00
github-actions
89f1549741 bump version 2022-02-26 06:11:26 +00:00
Balazs Kelemen
b56a054f50 Canned message - Extend messages length 2022-02-21 22:06:03 +01:00
26 changed files with 1195 additions and 221 deletions

View File

@@ -8,6 +8,7 @@ jobs:
outputs:
version: ${{ steps.get_version.outputs.version }}
upload_url: ${{ steps.create_release.outputs.upload_url }}
new_sha: ${{ steps.commit_updated.outputs.sha }}
steps:
@@ -19,12 +20,14 @@ jobs:
bin/bump_version.py
- name: Commit updated version.py
id: commit_updated
run: |
git config --global user.name 'github-actions'
git config --global user.email 'bot@noreply.github.com'
git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/${{ github.repository }}
git add setup.py
git commit -m "bump version" && git push || echo "No changes to commit"
git log -n 1 --pretty=format:"%H" | tail -n 1 | awk '{print "::set-output name=sha::"$0}'
- name: Get version
id: get_version
@@ -38,16 +41,13 @@ jobs:
with:
draft: true
prerelease: true
release_name: Meshtastic ${{ steps.get_version.outputs.version }}
release_name: Meshtastic Python ${{ steps.get_version.outputs.version }}
tag_name: ${{ steps.get_version.outputs.version }}
body: |
Autogenerated by github action, developer should edit as required before publishing...
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Checkout
uses: actions/checkout@v2
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
@@ -82,7 +82,9 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
ref: ${{ needs.release_create.outputs.new_sha }}
- name: Set up Python 3.9
uses: actions/setup-python@v2
@@ -127,7 +129,9 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
ref: ${{ needs.release_create.outputs.new_sha }}
- name: Set up Python 3.9
uses: actions/setup-python@v2
@@ -167,7 +171,9 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v3
with:
ref: ${{ needs.release_create.outputs.new_sha }}
- name: Set up Python 3.9
uses: actions/setup-python@v2

1
.gitignore vendored
View File

@@ -6,6 +6,7 @@ dist
log_*
.eggs
nanopb-0.4.4
nanopb-0.4.5
.*swp
.coverage
*.py-E

1
.gitmodules vendored
View File

@@ -1,3 +1,4 @@
[submodule "proto"]
path = proto
url = https://github.com/meshtastic/Meshtastic-protobufs.git
branch = 1.2-legacy

View File

@@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
#
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except,too-many-public-methods
[BASIC]
@@ -41,7 +41,7 @@ bad-names=foo,bar,baz,toto,tutu,tata
max-line-length=150
# Maximum number of lines in a module
max-module-lines=1200
max-module-lines=1400

View File

@@ -22,6 +22,12 @@ lint:
slow:
pytest -m unit --durations=5
proto: FORCE
git submodule update --init --recursive
git pull --rebase
git submodule update --remote --merge
./bin/regen-protos.sh
# run the coverage report and open results in a browser
cov:
pytest --cov-report html --cov=meshtastic

View File

@@ -3,8 +3,7 @@
"""
import sys
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices
from meshtastic.util import detect_supported_devices
from meshtastic.util import detect_supported_devices, get_unique_vendor_ids, active_ports_on_supported_devices
# simple arg check
if len(sys.argv) != 1:

6
examples/show_ports.py Normal file
View File

@@ -0,0 +1,6 @@
"""Simple program to show serial ports.
"""
from meshtastic.util import findPorts
print(findPorts())

30
info/mac/nano_g1.txt Normal file
View File

@@ -0,0 +1,30 @@
meshtastic detected port: /dev/cu.wchusbserial53820208781
ioreg -p IOUSB
shows this:
| +-o USB Single Serial@14300000 <class AppleUSBDevice, id 0x1000407a5, registered, matched, active, busy 0 (18 ms), retain 14>
system_profiler SPUSBDataType > /tmp/a
with device plugged in
system_profiler SPUSBDataType > /tmp/b
with device not plugged in
diff /tmp/a /tmp/b
< USB Single Serial:
<
< Product ID: 0x55d4
< Vendor ID: 0x1a86
< Version: 4.43
< Serial Number: 5382020878
< Speed: Up to 12 Mb/s
< Location ID: 0x14300000 / 63
< Current Available (mA): 500
< Current Required (mA): 134
< Extra Operating Current (mA): 0

91
info/ubuntu/nano_g1.txt Normal file
View File

@@ -0,0 +1,91 @@
lsusb -d 1a86: -v
Bus 001 Device 013: ID 1a86:55d4 QinHeng Electronics
Couldn't open device, some information will be missing
Device Descriptor:
bLength 18
bDescriptorType 1
bcdUSB 1.10
bDeviceClass 2 Communications
bDeviceSubClass 0
bDeviceProtocol 0
bMaxPacketSize0 8
idVendor 0x1a86 QinHeng Electronics
idProduct 0x55d4
bcdDevice 4.43
iManufacturer 0
iProduct 2
iSerial 3
bNumConfigurations 1
Configuration Descriptor:
bLength 9
bDescriptorType 2
wTotalLength 0x0043
bNumInterfaces 2
bConfigurationValue 1
iConfiguration 0
bmAttributes 0xa0
(Bus Powered)
Remote Wakeup
MaxPower 134mA
Interface Descriptor:
bLength 9
bDescriptorType 4
bInterfaceNumber 0
bAlternateSetting 0
bNumEndpoints 1
bInterfaceClass 2 Communications
bInterfaceSubClass 2 Abstract (modem)
bInterfaceProtocol 1 AT-commands (v.25ter)
iInterface 0
CDC Header:
bcdCDC 1.10
CDC Call Management:
bmCapabilities 0x00
bDataInterface 1
CDC ACM:
bmCapabilities 0x02
line coding and serial state
CDC Union:
bMasterInterface 0
bSlaveInterface 1
Endpoint Descriptor:
bLength 7
bDescriptorType 5
bEndpointAddress 0x83 EP 3 IN
bmAttributes 3
Transfer Type Interrupt
Synch Type None
Usage Type Data
wMaxPacketSize 0x0010 1x 16 bytes
bInterval 1
Interface Descriptor:
bLength 9
bDescriptorType 4
bInterfaceNumber 1
bAlternateSetting 0
bNumEndpoints 2
bInterfaceClass 10 CDC Data
bInterfaceSubClass 0
bInterfaceProtocol 0
iInterface 0
Endpoint Descriptor:
bLength 7
bDescriptorType 5
bEndpointAddress 0x02 EP 2 OUT
bmAttributes 2
Transfer Type Bulk
Synch Type None
Usage Type Data
wMaxPacketSize 0x0020 1x 32 bytes
bInterval 0
Endpoint Descriptor:
bLength 7
bDescriptorType 5
bEndpointAddress 0x82 EP 2 IN
bmAttributes 2
Transfer Type Bulk
Synch Type None
Usage Type Data
wMaxPacketSize 0x0040 1x 64 bytes
bInterval 0

39
info/windows/nano_g1.txt Normal file
View File

@@ -0,0 +1,39 @@
Get-PnpDevice -PresentOnly | Format-List >a
Get-PnpDevice -PresentOnly | Format-List >b
Compare-Object (get-content a) (Get-Content b)
InputObject SideIndicator
----------- -------------
Caption : USB-Enhanced-SERIAL CH9102 (COM9) =>
Description : USB-Enhanced-SERIAL CH9102 =>
Name : USB-Enhanced-SERIAL CH9102 (COM9) =>
DeviceID : USB\VID_1A86&PID_55D4\5382020745 =>
PNPDeviceID : USB\VID_1A86&PID_55D4\5382020745 =>
ClassGuid : {4d36e978-e325-11ce-bfc1-08002be10318} =>
CompatibleID : {USB\Class_02&SubClass_02&Prot_01, USB\Class_02&SubClass_02, USB\Class_02} =>
HardwareID : {USB\VID_1A86&PID_55D4&REV_0443, USB\VID_1A86&PID_55D4} =>
Manufacturer : wch.cn =>
PNPClass : Ports =>
Service : CH343SER_A64 =>
Class : Ports =>
FriendlyName : USB-Enhanced-SERIAL CH9102 (COM9) =>
InstanceId : USB\VID_1A86&PID_55D4\5382020745 =>
InstallDate : =>
Status : OK =>
Availability : =>
ConfigManagerErrorCode : CM_PROB_NONE =>
ConfigManagerUserConfig : False =>
CreationClassName : Win32_PnPEntity =>
ErrorCleared : =>
ErrorDescription : =>
LastErrorCode : =>
PowerManagementCapabilities : =>
PowerManagementSupported : =>
StatusInfo : =>
SystemCreationClassName : Win32_ComputerSystem =>
SystemName : DESKTOP-FRFQN8H =>
Present : True =>
PSComputerName : =>
Problem : CM_PROB_NONE =>
ProblemDescription : =>
=>

View File

@@ -119,6 +119,10 @@ def setPref(attributes, name, valStr):
val = meshtastic.util.fromStr(valStr)
logging.debug(f'valStr:{valStr} val:{val}')
if snake_name == 'wifi_password' and len(valStr) < 8:
print(f"Warning: wifi_password must be 8 or more characters.")
return
enumType = field.enum_type
# pylint: disable=C0123
if enumType and type(val) == str:
@@ -200,6 +204,12 @@ def onConnected(interface):
print(f"Setting device owner short to {args.set_owner_short}")
interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short)
# TODO: add to export-config and configure
if args.set_canned_message:
closeNow = True
print(f"Setting canned plugin message to {args.set_canned_message}")
interface.getNode(args.dest).set_canned_message(args.set_canned_message)
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
@@ -510,6 +520,11 @@ def onConnected(interface):
print(f"Writing modified channels to device")
interface.getNode(args.dest).writeChannel(channelIndex)
if args.get_canned_message:
closeNow = True
print("")
interface.getNode(args.dest).get_canned_message()
if args.info:
print("")
# If we aren't trying to talk to our local node, don't show it
@@ -746,6 +761,9 @@ def initParser():
parser.add_argument("--info", help="Read and display the radio config information",
action="store_true")
parser.add_argument("--get-canned-message", help="Show the canned message plugin message",
action="store_true")
parser.add_argument("--nodes", help="Print Node List in a pretty formatted table",
action="store_true")
@@ -808,6 +826,9 @@ def initParser():
parser.add_argument(
"--set-owner", help="Set device owner name", action="store")
parser.add_argument(
"--set-canned-message", help="Set the canned messages plugin message (up to 1000 characters).", action="store")
parser.add_argument(
"--set-owner-short", help="Set device owner short name", action="store")

View File

@@ -12,13 +12,12 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default()
from . import cannedmessages_pb2 as cannedmessages__pb2
from . import channel_pb2 as channel__pb2
from . import mesh_pb2 as mesh__pb2
from . import radioconfig_pb2 as radioconfig__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61\x64min.proto\x1a\x14\x63\x61nnedmessages.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\x87\x0c\n\x0c\x41\x64minMessage\x12!\n\tset_radio\x18\x01 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1a\n\tset_owner\x18\x02 \x01(\x0b\x32\x05.UserH\x00\x12\x1f\n\x0bset_channel\x18\x03 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_radio_request\x18\x04 \x01(\x08H\x00\x12*\n\x12get_radio_response\x18\x05 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1d\n\x13get_channel_request\x18\x06 \x01(\rH\x00\x12(\n\x14get_channel_response\x18\x07 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_owner_request\x18\x08 \x01(\x08H\x00\x12#\n\x12get_owner_response\x18\t \x01(\x0b\x32\x05.UserH\x00\x12\x1d\n\x13\x63onfirm_set_channel\x18 \x01(\x08H\x00\x12\x1b\n\x11\x63onfirm_set_radio\x18! \x01(\x08H\x00\x12\x18\n\x0e\x65xit_simulator\x18\" \x01(\x08H\x00\x12\x18\n\x0ereboot_seconds\x18# \x01(\x05H\x00\x12\x31\n\'get_canned_message_plugin_part1_request\x18$ \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part1_response\x18% \x01(\x0b\x32 .CannedMessagePluginMessagePart1H\x00\x12\x31\n\'get_canned_message_plugin_part2_request\x18& \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part2_response\x18\' \x01(\x0b\x32 .CannedMessagePluginMessagePart2H\x00\x12\x31\n\'get_canned_message_plugin_part3_request\x18( \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part3_response\x18) \x01(\x0b\x32 .CannedMessagePluginMessagePart3H\x00\x12\x31\n\'get_canned_message_plugin_part4_request\x18* \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part4_response\x18+ \x01(\x0b\x32 .CannedMessagePluginMessagePart4H\x00\x12\x31\n\'get_canned_message_plugin_part5_request\x18, \x01(\x08H\x00\x12T\n(get_canned_message_plugin_part5_response\x18- \x01(\x0b\x32 .CannedMessagePluginMessagePart5H\x00\x12K\n\x1fset_canned_message_plugin_part1\x18. \x01(\x0b\x32 .CannedMessagePluginMessagePart1H\x00\x12K\n\x1fset_canned_message_plugin_part2\x18/ \x01(\x0b\x32 .CannedMessagePluginMessagePart2H\x00\x12K\n\x1fset_canned_message_plugin_part3\x18\x30 \x01(\x0b\x32 .CannedMessagePluginMessagePart3H\x00\x12K\n\x1fset_canned_message_plugin_part4\x18\x31 \x01(\x0b\x32 .CannedMessagePluginMessagePart4H\x00\x12K\n\x1fset_canned_message_plugin_part5\x18\x32 \x01(\x0b\x32 .CannedMessagePluginMessagePart5H\x00\x12\x1a\n\x10shutdown_seconds\x18\x33 \x01(\x05H\x00\x42\t\n\x07variantBG\n\x13\x63om.geeksville.meshB\x0b\x41\x64minProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0b\x61\x64min.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\xa1\x08\n\x0c\x41\x64minMessage\x12!\n\tset_radio\x18\x01 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1a\n\tset_owner\x18\x02 \x01(\x0b\x32\x05.UserH\x00\x12\x1f\n\x0bset_channel\x18\x03 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_radio_request\x18\x04 \x01(\x08H\x00\x12*\n\x12get_radio_response\x18\x05 \x01(\x0b\x32\x0c.RadioConfigH\x00\x12\x1d\n\x13get_channel_request\x18\x06 \x01(\rH\x00\x12(\n\x14get_channel_response\x18\x07 \x01(\x0b\x32\x08.ChannelH\x00\x12\x1b\n\x11get_owner_request\x18\x08 \x01(\x08H\x00\x12#\n\x12get_owner_response\x18\t \x01(\x0b\x32\x05.UserH\x00\x12\x1d\n\x13\x63onfirm_set_channel\x18 \x01(\x08H\x00\x12\x1b\n\x11\x63onfirm_set_radio\x18! \x01(\x08H\x00\x12\x18\n\x0e\x65xit_simulator\x18\" \x01(\x08H\x00\x12\x18\n\x0ereboot_seconds\x18# \x01(\x05H\x00\x12\x31\n\'get_canned_message_plugin_part1_request\x18$ \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part1_response\x18% \x01(\tH\x00\x12\x31\n\'get_canned_message_plugin_part2_request\x18& \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part2_response\x18\' \x01(\tH\x00\x12\x31\n\'get_canned_message_plugin_part3_request\x18( \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part3_response\x18) \x01(\tH\x00\x12\x31\n\'get_canned_message_plugin_part4_request\x18* \x01(\x08H\x00\x12\x32\n(get_canned_message_plugin_part4_response\x18+ \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part1\x18, \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part2\x18- \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part3\x18. \x01(\tH\x00\x12)\n\x1fset_canned_message_plugin_part4\x18/ \x01(\tH\x00\x12\x1a\n\x10shutdown_seconds\x18\x33 \x01(\x05H\x00\x42\t\n\x07variantBG\n\x13\x63om.geeksville.meshB\x0b\x41\x64minProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
@@ -34,6 +33,6 @@ if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\023com.geeksville.meshB\013AdminProtosH\003Z!github.com/meshtastic/gomeshproto'
_ADMINMESSAGE._serialized_start=84
_ADMINMESSAGE._serialized_end=1627
_ADMINMESSAGE._serialized_start=62
_ADMINMESSAGE._serialized_end=1119
# @@protoc_insertion_point(module_scope)

View File

@@ -14,62 +14,22 @@ _sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14\x63\x61nnedmessages.proto\"/\n\x1f\x43\x61nnedMessagePluginMessagePart1\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart2\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart3\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart4\x12\x0c\n\x04text\x18\x01 \x01(\t\"/\n\x1f\x43\x61nnedMessagePluginMessagePart5\x12\x0c\n\x04text\x18\x01 \x01(\tBU\n\x13\x63om.geeksville.meshB\x19\x43\x61nnedMessageConfigProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x14\x63\x61nnedmessages.proto\"w\n\x19\x43\x61nnedMessagePluginConfig\x12\x15\n\rmessagesPart1\x18\x0b \x01(\t\x12\x15\n\rmessagesPart2\x18\x0c \x01(\t\x12\x15\n\rmessagesPart3\x18\r \x01(\t\x12\x15\n\rmessagesPart4\x18\x0e \x01(\tBU\n\x13\x63om.geeksville.meshB\x19\x43\x61nnedMessageConfigProtosH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
_CANNEDMESSAGEPLUGINMESSAGEPART1 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart1']
_CANNEDMESSAGEPLUGINMESSAGEPART2 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart2']
_CANNEDMESSAGEPLUGINMESSAGEPART3 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart3']
_CANNEDMESSAGEPLUGINMESSAGEPART4 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart4']
_CANNEDMESSAGEPLUGINMESSAGEPART5 = DESCRIPTOR.message_types_by_name['CannedMessagePluginMessagePart5']
CannedMessagePluginMessagePart1 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart1', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART1,
_CANNEDMESSAGEPLUGINCONFIG = DESCRIPTOR.message_types_by_name['CannedMessagePluginConfig']
CannedMessagePluginConfig = _reflection.GeneratedProtocolMessageType('CannedMessagePluginConfig', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINCONFIG,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart1)
# @@protoc_insertion_point(class_scope:CannedMessagePluginConfig)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart1)
CannedMessagePluginMessagePart2 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart2', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART2,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart2)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart2)
CannedMessagePluginMessagePart3 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart3', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART3,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart3)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart3)
CannedMessagePluginMessagePart4 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart4', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART4,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart4)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart4)
CannedMessagePluginMessagePart5 = _reflection.GeneratedProtocolMessageType('CannedMessagePluginMessagePart5', (_message.Message,), {
'DESCRIPTOR' : _CANNEDMESSAGEPLUGINMESSAGEPART5,
'__module__' : 'cannedmessages_pb2'
# @@protoc_insertion_point(class_scope:CannedMessagePluginMessagePart5)
})
_sym_db.RegisterMessage(CannedMessagePluginMessagePart5)
_sym_db.RegisterMessage(CannedMessagePluginConfig)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b'\n\023com.geeksville.meshB\031CannedMessageConfigProtosH\003Z!github.com/meshtastic/gomeshproto'
_CANNEDMESSAGEPLUGINMESSAGEPART1._serialized_start=24
_CANNEDMESSAGEPLUGINMESSAGEPART1._serialized_end=71
_CANNEDMESSAGEPLUGINMESSAGEPART2._serialized_start=73
_CANNEDMESSAGEPLUGINMESSAGEPART2._serialized_end=120
_CANNEDMESSAGEPLUGINMESSAGEPART3._serialized_start=122
_CANNEDMESSAGEPLUGINMESSAGEPART3._serialized_end=169
_CANNEDMESSAGEPLUGINMESSAGEPART4._serialized_start=171
_CANNEDMESSAGEPLUGINMESSAGEPART4._serialized_end=218
_CANNEDMESSAGEPLUGINMESSAGEPART5._serialized_start=220
_CANNEDMESSAGEPLUGINMESSAGEPART5._serialized_end=267
_CANNEDMESSAGEPLUGINCONFIG._serialized_start=24
_CANNEDMESSAGEPLUGINCONFIG._serialized_end=143
# @@protoc_insertion_point(module_scope)

View File

@@ -17,7 +17,7 @@ from . import mesh_pb2 as mesh__pb2
from . import radioconfig_pb2 as radioconfig__pb2
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65viceonly.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\x80\x01\n\x11LegacyRadioConfig\x12\x39\n\x0bpreferences\x18\x01 \x01(\x0b\x32$.LegacyRadioConfig.LegacyPreferences\x1a\x30\n\x11LegacyPreferences\x12\x1b\n\x06region\x18\x0f \x01(\x0e\x32\x0b.RegionCode\"\xf0\x03\n\x0b\x44\x65viceState\x12\'\n\x0blegacyRadio\x18\x01 \x01(\x0b\x32\x12.LegacyRadioConfig\x12\x1c\n\x07my_node\x18\x02 \x01(\x0b\x32\x0b.MyNodeInfo\x12\x14\n\x05owner\x18\x03 \x01(\x0b\x32\x05.User\x12\x1a\n\x07node_db\x18\x04 \x03(\x0b\x32\t.NodeInfo\x12\"\n\rreceive_queue\x18\x05 \x03(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07version\x18\x08 \x01(\r\x12$\n\x0frx_text_message\x18\x07 \x01(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07no_save\x18\t \x01(\x08\x12\x15\n\rdid_gps_reset\x18\x0b \x01(\x08\x12+\n#canned_message_plugin_message_part1\x18\r \x01(\t\x12+\n#canned_message_plugin_message_part2\x18\x0e \x01(\t\x12+\n#canned_message_plugin_message_part3\x18\x0f \x01(\t\x12+\n#canned_message_plugin_message_part4\x18\x10 \x01(\t\x12+\n#canned_message_plugin_message_part5\x18\x11 \x01(\tJ\x04\x08\x0c\x10\r\")\n\x0b\x43hannelFile\x12\x1a\n\x08\x63hannels\x18\x01 \x03(\x0b\x32\x08.ChannelBF\n\x13\x63om.geeksville.meshB\nDeviceOnlyH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x10\x64\x65viceonly.proto\x1a\rchannel.proto\x1a\nmesh.proto\x1a\x11radioconfig.proto\"\x80\x01\n\x11LegacyRadioConfig\x12\x39\n\x0bpreferences\x18\x01 \x01(\x0b\x32$.LegacyRadioConfig.LegacyPreferences\x1a\x30\n\x11LegacyPreferences\x12\x1b\n\x06region\x18\x0f \x01(\x0e\x32\x0b.RegionCode\"\x8f\x02\n\x0b\x44\x65viceState\x12\'\n\x0blegacyRadio\x18\x01 \x01(\x0b\x32\x12.LegacyRadioConfig\x12\x1c\n\x07my_node\x18\x02 \x01(\x0b\x32\x0b.MyNodeInfo\x12\x14\n\x05owner\x18\x03 \x01(\x0b\x32\x05.User\x12\x1a\n\x07node_db\x18\x04 \x03(\x0b\x32\t.NodeInfo\x12\"\n\rreceive_queue\x18\x05 \x03(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07version\x18\x08 \x01(\r\x12$\n\x0frx_text_message\x18\x07 \x01(\x0b\x32\x0b.MeshPacket\x12\x0f\n\x07no_save\x18\t \x01(\x08\x12\x15\n\rdid_gps_reset\x18\x0b \x01(\x08J\x04\x08\x0c\x10\r\")\n\x0b\x43hannelFile\x12\x1a\n\x08\x63hannels\x18\x01 \x03(\x0b\x32\x08.ChannelBF\n\x13\x63om.geeksville.meshB\nDeviceOnlyH\x03Z!github.com/meshtastic/gomeshprotob\x06proto3')
@@ -63,7 +63,7 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_LEGACYRADIOCONFIG_LEGACYPREFERENCES._serialized_start=147
_LEGACYRADIOCONFIG_LEGACYPREFERENCES._serialized_end=195
_DEVICESTATE._serialized_start=198
_DEVICESTATE._serialized_end=694
_CHANNELFILE._serialized_start=696
_CHANNELFILE._serialized_end=737
_DEVICESTATE._serialized_end=469
_CHANNELFILE._serialized_start=471
_CHANNELFILE._serialized_end=512
# @@protoc_insertion_point(module_scope)

View File

@@ -254,11 +254,12 @@ class MeshInterface:
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
meshPacket.id = self._generatePacketId()
if onResponse is not None:
self._addResponseHandler(meshPacket.id, onResponse)
p = self._sendPacket(meshPacket, destinationId,
wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0,
@@ -631,11 +632,13 @@ class MeshInterface:
# asObj = DotMap(asDict)
topic = "meshtastic.receive" # Generic unknown packet type
decoded = asDict["decoded"]
# The default MessageToDict converts byte arrays into base64 strings.
# We don't want that - it messes up data payload. So slam in the correct
# byte array.
decoded["payload"] = meshPacket.decoded.payload
decoded = None
if 'decoded' in asDict:
decoded = asDict["decoded"]
# The default MessageToDict converts byte arrays into base64 strings.
# We don't want that - it messes up data payload. So slam in the correct
# byte array.
decoded["payload"] = meshPacket.decoded.payload
# UNKNOWN_APP is the default protobuf portnum value, and therefore if not
# set it will not be populated at all to make API usage easier, set

View File

File diff suppressed because one or more lines are too long

View File

@@ -3,6 +3,7 @@
import logging
import base64
import time
from google.protobuf.json_format import MessageToJson
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
@@ -24,6 +25,15 @@ class Node:
self.partialChannels = None
self.noProto = noProto
self.cannedPluginMessage = None
self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None
self.gotResponse = None
def showChannels(self):
"""Show human readable description of our channels."""
print("Channels:")
@@ -56,6 +66,14 @@ class Node:
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished
# Note: We do not get the canned plugin message, unless get_canned_message() is called
self.cannedPluginMessage = None
self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None
self._requestSettings()
def turnOffEncryptionOnPrimaryChannel(self):
@@ -64,9 +82,9 @@ class Node:
print("Writing modified channels to device")
self.writeChannel(0)
def waitForConfig(self):
def waitForConfig(self, attribute='channels'):
"""Block until radio config is received. Returns True if config has been received."""
return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels'))
return self._timeout.waitForSet(self, attrs=('radioConfig', attribute))
def writeConfig(self):
"""Write the current (edited) radioConfig to the device"""
@@ -237,7 +255,7 @@ class Node:
"""Handle the response packet for requesting settings _requestSettings()"""
logging.debug(f'onResponseRequestSetting() p:{p}')
errorFound = False
if 'routing' in p["decoded"]:
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
@@ -268,6 +286,156 @@ class Node:
return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)
def onResponseRequestCannedMessagePluginMessagePart1(self, p):
"""Handle the response packet for requesting canned message plugin message part 1"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart1() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart1 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part1_response
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart2(self, p):
"""Handle the response packet for requesting canned message plugin message part 2"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart2() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart2 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part2_response
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart3(self, p):
"""Handle the response packet for requesting canned message plugin message part 3"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart3() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart3 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part3_response
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart4(self, p):
"""Handle the response packet for requesting canned message plugin message part 4"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart4() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart4 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part4_response
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.gotResponse = True
def get_canned_message(self):
"""Get the canned message string. Concatenate all pieces together and return a single string."""
logging.debug(f'in get_canned_message()')
if not self.cannedPluginMessage:
p1 = admin_pb2.AdminMessage()
p1.get_canned_message_plugin_part1_request = True
self.gotResponse = False
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart1)
while self.gotResponse is False:
time.sleep(0.1)
p2 = admin_pb2.AdminMessage()
p2.get_canned_message_plugin_part2_request = True
self.gotResponse = False
self._sendAdmin(p2, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart2)
while self.gotResponse is False:
time.sleep(0.1)
p3 = admin_pb2.AdminMessage()
p3.get_canned_message_plugin_part3_request = True
self.gotResponse = False
self._sendAdmin(p3, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart3)
while self.gotResponse is False:
time.sleep(0.1)
p4 = admin_pb2.AdminMessage()
p4.get_canned_message_plugin_part4_request = True
self.gotResponse = False
self._sendAdmin(p4, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart4)
while self.gotResponse is False:
time.sleep(0.1)
# TODO: This feels wrong to have a sleep here. Is there a way to ensure that
# all requests are complete? Perhaps change to a while loop any parts are None... maybe?
time.sleep(3)
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.cannedPluginMessage = ""
if self.cannedPluginMessagePart1:
self.cannedPluginMessage += self.cannedPluginMessagePart1
if self.cannedPluginMessagePart2:
self.cannedPluginMessage += self.cannedPluginMessagePart2
if self.cannedPluginMessagePart3:
self.cannedPluginMessage += self.cannedPluginMessagePart3
if self.cannedPluginMessagePart4:
self.cannedPluginMessage += self.cannedPluginMessagePart4
print(f'canned_plugin_message:{self.cannedPluginMessage}')
logging.debug(f'canned_plugin_message:{self.cannedPluginMessage}')
return self.cannedPluginMessage
def set_canned_message(self, message):
"""Set the canned message. Split into parts of 200 chars each."""
if len(message) > 800:
our_exit("Warning: The canned message must be less than 800 characters.")
# split into chunks
chunks = []
chunks_size = 200
for i in range(0, len(message), chunks_size):
chunks.append(message[i: i + chunks_size])
# for each chunk, send a message to set the values
#for i in range(0, len(chunks)):
for i, chunk in enumerate(chunks):
p = admin_pb2.AdminMessage()
# TODO: should be a way to improve this
if i == 0:
p.set_canned_message_plugin_part1 = chunk
elif i == 1:
p.set_canned_message_plugin_part2 = chunk
elif i == 2:
p.set_canned_message_plugin_part3 = chunk
elif i == 3:
p.set_canned_message_plugin_part4 = chunk
logging.debug(f"Setting canned message '{chunk}' part {i+1}")
self._sendAdmin(p)
def exitSimulator(self):
"""Tell a simulator node to exit (this message
is ignored for other nodes)"""

View File

File diff suppressed because one or more lines are too long

View File

@@ -2,10 +2,6 @@
It is used for auto detection as to which device might be connected.
"""
import platform
import subprocess
import re
# Goal is to detect which device and port to use from the supported devices
# without installing any libraries that are not currently in the python meshtastic library
@@ -42,13 +38,13 @@ tbeam_M8N = SupportedDevice(name="T-Beam", version="M8N", for_firmware="tbeam",
tbeam_M8N_SX1262 = SupportedDevice(name="T-Beam", version="M8N_SX1262", for_firmware="tbeam",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
tlora_v1_1 = SupportedDevice(name="T-Lora", version="1.1", for_firmware="tlora-v1",
tlora_v1 = SupportedDevice(name="T-Lora", version="1", for_firmware="tlora-v1",
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial",
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
tlora_v1_3 = SupportedDevice(name="T-Lora", version="1.3", for_firmware="tlora-v1-3",
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial",
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
tlora_v2_0 = SupportedDevice(name="T-Lora", version="2.0", for_firmware="tlora-v2-1",
tlora_v2 = SupportedDevice(name="T-Lora", version="2", for_firmware="tlora-v2",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
tlora_v2_1 = SupportedDevice(name="T-Lora", version="2.1", for_firmware="tlora-v2-1",
@@ -85,112 +81,12 @@ rak4631_19003 = SupportedDevice(name="RAK 4631 19003", version="", for_firmware=
device_class="nrf52",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="239a", usb_product_id_in_hex="8029")
nano_g1 = SupportedDevice(name="Nano G1", version="", for_firmware="nano-g1",
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
supported_devices = [tbeam_v0_7, tbeam_v1_1, tbeam_M8N, tbeam_M8N_SX1262,
tlora_v1_1, tlora_v1_3, tlora_v2_0, tlora_v2_1, tlora_v2_1_1_6,
tlora_v1, tlora_v1_3, tlora_v2, tlora_v2_1, tlora_v2_1_1_6,
heltec_v1, heltec_v2_0, heltec_v2_1,
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
rak11200]
def get_unique_vendor_ids():
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex:
vids.add(d.usb_vendor_id_in_hex)
return vids
def get_devices_with_vendor_id(vid):
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex == vid:
sd.add(d)
return sd
def active_ports_on_supported_devices(sds):
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
# figure out what possible base ports there are
for d in sds:
if system == "Linux":
baseports.add(d.baseport_on_linux)
elif system == "Darwin":
baseports.add(d.baseport_on_mac)
elif system == "Windows":
baseports.add(d.baseport_on_windows)
for bp in baseports:
if system == "Linux":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Darwin":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Windows":
# for each device in supported devices found
for d in sds:
# find the port(s)
com_ports = detect_windows_port(d)
#print(f'com_ports:{com_ports}')
# add all ports
for com_port in com_ports:
ports.add(com_port)
return ports
def detect_windows_port(sd):
"""detect if Windows port"""
ports = set()
if sd:
system = platform.system()
if system == "Windows":
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
command += ')} | Format-List"'
#print(f'command:{command}')
_, sp_output = subprocess.getstatusoutput(command)
#print(f'sp_output:{sp_output}')
p = re.compile(r'\(COM(.*)\)')
for x in p.findall(sp_output):
#print(f'x:{x}')
ports.add(f'COM{x}')
return ports
rak11200, nano_g1]

View File

@@ -442,6 +442,43 @@ def test_main_set_owner_short_to_bob(capsys):
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_canned_messages(capsys):
"""Test --set-canned-message """
sys.argv = ['', '--set-canned-message', 'foo']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Setting canned plugin message to foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_get_canned_messages(capsys, caplog, iface_with_nodes):
"""Test --get-canned-message """
sys.argv = ['', '--get-canned-message']
Globals.getInstance().set_args(sys.argv)
iface = iface_with_nodes
iface.localNode.cannedPluginMessage = 'foo'
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'canned_plugin_message:foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_ham_to_KI123(capsys):
@@ -865,6 +902,27 @@ def test_main_set_valid_wifi_passwd(capsys):
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_invalid_wifi_passwd(capsys):
"""Test --set with an invalid value (password must be 8 or more characters)"""
sys.argv = ['', '--set', 'wifi_password', '1234567']
Globals.getInstance().set_args(sys.argv)
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert not re.search(r'Set wifi_password to 1234567', out, re.MULTILINE)
assert re.search(r'Warning: wifi_password must be 8 or more characters.', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_valid_camel_case(capsys):

View File

@@ -11,6 +11,9 @@ from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig
#from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
# CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
# CannedMessagePluginMessagePart5)
from ..util import Timeout
@@ -43,6 +46,122 @@ def test_node_requestConfig(capsys):
assert err == ''
#@pytest.mark.unit
#def test_node_get_canned_message_with_all_parts(capsys):
# """Test run get_canned_message()"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# # we have a sleep in this method, so override it so it goes fast
# with patch('time.sleep'):
# anode = Node(mo, 'bar')
# anode.cannedPluginMessagePart1 = 'a'
# anode.cannedPluginMessagePart2 = 'b'
# anode.cannedPluginMessagePart3 = 'c'
# anode.cannedPluginMessagePart4 = 'd'
# anode.cannedPluginMessagePart5 = 'e'
# anode.get_canned_message()
# out, err = capsys.readouterr()
# assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_node_get_canned_message_with_some_parts(capsys):
# """Test run get_canned_message()"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# # we have a sleep in this method, so override it so it goes fast
# with patch('time.sleep'):
# anode = Node(mo, 'bar')
# anode.cannedPluginMessagePart1 = 'a'
# anode.get_canned_message()
# out, err = capsys.readouterr()
# assert re.search(r'canned_plugin_message:a', out, re.MULTILINE)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_one_part(caplog):
# """Test run set_canned_message()"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# anode.set_canned_message('foo')
# assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE)
# assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_200(caplog):
# """Test run set_canned_message() 200 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# message_200_chars_long = 'a' * 200
# anode.set_canned_message(message_200_chars_long)
# assert re.search(r" part 1", caplog.text, re.MULTILINE)
# assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_201(caplog):
# """Test run set_canned_message() 201 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# message_201_chars_long = 'a' * 201
# anode.set_canned_message(message_201_chars_long)
# assert re.search(r" part 1", caplog.text, re.MULTILINE)
# assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_1000(caplog):
# """Test run set_canned_message() 1000 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# amesg = MagicMock(autospec=AdminMessage)
# with caplog.at_level(logging.DEBUG):
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# anode = Node(mo, 'bar')
# message_1000_chars_long = 'a' * 1000
# anode.set_canned_message(message_1000_chars_long)
# assert re.search(r" part 1", caplog.text, re.MULTILINE)
# assert re.search(r" part 2", caplog.text, re.MULTILINE)
# assert re.search(r" part 3", caplog.text, re.MULTILINE)
# assert re.search(r" part 4", caplog.text, re.MULTILINE)
# assert re.search(r" part 5", caplog.text, re.MULTILINE)
#
#
#@pytest.mark.unit
#def test_node_set_canned_message_1001(capsys):
# """Test run set_canned_message() 1001 characters long"""
# iface = MagicMock(autospec=SerialInterface)
# with pytest.raises(SystemExit) as pytest_wrapped_e:
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar')
# message_1001_chars_long = 'a' * 1001
# anode.set_canned_message(message_1001_chars_long)
# assert pytest_wrapped_e.type == SystemExit
# assert pytest_wrapped_e.value.code == 1
# out, err = capsys.readouterr()
# assert re.search(r'Warning: The canned message', out, re.MULTILINE)
# assert err == ''
@pytest.mark.unit
def test_setOwner_and_team(caplog):
"""Test setOwner"""
@@ -694,6 +813,352 @@ def test_requestChannel_localNode(caplog):
assert not re.search(r'from remote node', caplog.text, re.MULTILINE)
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart1()"""
#
# part1 = CannedMessagePluginMessagePart1()
# part1.text = 'foo1'
#
# msg1 = MagicMock(autospec=AdminMessage)
# msg1.get_canned_message_plugin_part1_response = part1
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart1Response': {'text': 'foo1'},
# 'raw': msg1
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart1 == 'foo1'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart2(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart2()"""
#
# part2 = CannedMessagePluginMessagePart2()
# part2.text = 'foo2'
#
# msg2 = MagicMock(autospec=AdminMessage)
# msg2.get_canned_message_plugin_part2_response = part2
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart2Response': {'text': 'foo2'},
# 'raw': msg2
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart2 == 'foo2'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart3(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart3()"""
#
# part3 = CannedMessagePluginMessagePart3()
# part3.text = 'foo3'
#
# msg3 = MagicMock(autospec=AdminMessage)
# msg3.get_canned_message_plugin_part3_response = part3
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart3Response': {'text': 'foo3'},
# 'raw': msg3
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart3 == 'foo3'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart4(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart4()"""
#
# part4 = CannedMessagePluginMessagePart4()
# part4.text = 'foo4'
#
# msg4 = MagicMock(autospec=AdminMessage)
# msg4.get_canned_message_plugin_part4_response = part4
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart4Response': {'text': 'foo4'},
# 'raw': msg4
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart4 == 'foo4'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart5(caplog):
# """Test onResponseRequestCannedMessagePluginMessagePart5()"""
#
# part5 = CannedMessagePluginMessagePart5()
# part5.text = 'foo5'
#
# msg5 = MagicMock(autospec=AdminMessage)
# msg5.get_canned_message_plugin_part5_response = part5
#
#
# packet = {
# 'from': 682968612,
# 'to': 682968612,
# 'decoded': {
# 'portnum': 'ADMIN_APP',
# 'payload': 'faked',
# 'requestId': 927039000,
# 'admin': {
# 'getCannedMessagePluginPart5Response': {'text': 'foo5'},
# 'raw': msg5
# }
# },
# 'id': 589440320,
# 'rxTime': 1642710843,
# 'hopLimit': 3,
# 'priority': 'RELIABLE',
# 'raw': 'faked',
# 'fromId': '!28b54624',
# 'toId': '!28b54624'
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
# assert anode.cannedPluginMessagePart5 == 'foo5'
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart1() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart2() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart3() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart4() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
#
#
#@pytest.mark.unit
#def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys):
# """Test onResponseRequestCannedMessagePluginMessagePart5() with error"""
#
# packet = {
# 'decoded': {
# 'routing': {
# 'errorReason': 'some made up error',
# },
# },
# }
#
# iface = MagicMock(autospec=SerialInterface)
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
# anode = Node(mo, 'bar', noProto=True)
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
# mo.localNode = anode
#
# with caplog.at_level(logging.DEBUG):
# anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
# assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
# out, err = capsys.readouterr()
# assert re.search(r'Error on response', out)
# assert err == ''
@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
"""Test onResponseRequestChannel()"""

View File

@@ -21,7 +21,7 @@ from ..util import findPorts
# seconds to pause after running a meshtastic command
PAUSE_AFTER_COMMAND = 0.1
PAUSE_AFTER_REBOOT = 0.1
PAUSE_AFTER_REBOOT = 0.2
#TODO: need to fix the virtual device to have a reboot. When you issue the command
@@ -358,6 +358,11 @@ def test_smokevirt_ch_set_downlink_and_uplink():
@pytest.mark.smokevirt
def test_smokevirt_ch_add_and_ch_del():
"""Test --ch-add"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
@@ -375,7 +380,7 @@ def test_smokevirt_ch_add_and_ch_del():
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
# make sure the secondar channel is not there
# make sure the secondary channel is not there
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --info')
assert re.match(r'Connected to radio', out)
assert not re.search(r'SECONDARY', out, re.MULTILINE)
@@ -386,6 +391,11 @@ def test_smokevirt_ch_add_and_ch_del():
@pytest.mark.smokevirt
def test_smokevirt_ch_enable_and_disable():
"""Test --ch-enable and --ch-disable"""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0
@@ -434,6 +444,11 @@ def test_smokevirt_ch_enable_and_disable():
@pytest.mark.smokevirt
def test_smokevirt_ch_del_a_disabled_non_primary_channel():
"""Test --ch-del will work on a disabled non-primary channel."""
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-index 1 --ch-del')
assert re.search(r'Deleting channel 1', out, re.MULTILINE)
assert return_value == 0
# pause for the radio
time.sleep(PAUSE_AFTER_REBOOT)
return_value, out = subprocess.getstatusoutput('meshtastic --host localhost --ch-add testing')
assert re.search(r'Writing modified channels to device', out, re.MULTILINE)
assert return_value == 0

View File

@@ -12,7 +12,9 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
remove_keys_from_dict, Timeout, hexstr,
ipstr, readnet_u16, findPorts, convert_mac_addr,
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
is_windows11)
is_windows11, active_ports_on_supported_devices)
from meshtastic.supported_device import SupportedDevice
@pytest.mark.unit
@@ -264,6 +266,22 @@ def test_findPorts_when_duplicate_found_and_duplicate_option_used(patch_comports
patch_comports.assert_called()
@pytest.mark.unitslow
@patch('serial.tools.list_ports.comports')
def test_findPorts_when_duplicate_found_and_duplicate_option_used_ports_reversed(patch_comports):
"""Test findPorts()"""
class TempPort:
""" temp class for port"""
def __init__(self, device=None, vid=None):
self.device = device
self.vid = vid
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
patch_comports.return_value = [fake2, fake1]
assert findPorts(eliminate_duplicates=True) == ['/dev/cu.wchusbserial1430']
patch_comports.assert_called()
@pytest.mark.unitslow
@patch('serial.tools.list_ports.comports')
def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports):
@@ -315,8 +333,13 @@ def test_eliminate_duplicate_port():
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1']) == ['/dev/fake', '/dev/fake1']
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1', '/dev/fake2']) == ['/dev/fake', '/dev/fake1', '/dev/fake2']
assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430']
assert eliminate_duplicate_port(['/dev/cu.wchusbserial1430', '/dev/cu.usbserial-1430']) == ['/dev/cu.wchusbserial1430']
assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001']
assert eliminate_duplicate_port(['/dev/cu.usbserial-0001', '/dev/cu.SLAB_USBtoUART']) == ['/dev/cu.usbserial-0001']
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
assert eliminate_duplicate_port(['/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441']) == ['/dev/cu.wchusbserial53230051441']
assert eliminate_duplicate_port(['/dev/cu.wchusbserial53230051441', '/dev/cu.usbmodem53230051441']) == ['/dev/cu.wchusbserial53230051441']
assert eliminate_duplicate_port(['/dev/cu.wchusbserial11301', '/dev/cu.usbmodem11301']) == ['/dev/cu.wchusbserial11301']
@patch('platform.version', return_value='10.0.22000.194')
@patch('platform.release', return_value='10')
@@ -358,3 +381,78 @@ def test_is_windows11_false_win8_1(patched_platform, patched_release):
assert is_windows11() is False
patched_platform.assert_called()
patched_release.assert_called()
@pytest.mark.unit
@patch('platform.system', return_value='Linux')
def test_active_ports_on_supported_devices_empty(mock_platform):
"""Test active_ports_on_supported_devices()"""
sds = set()
assert active_ports_on_supported_devices(sds) == set()
mock_platform.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Linux')
def test_active_ports_on_supported_devices_linux(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/ttyUSBfake')
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='ttyUSB')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/ttyUSBfake'}
mock_platform.assert_called()
mock_sp.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Darwin')
def test_active_ports_on_supported_devices_mac(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/cu.usbserial-foo')
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='cu.usbserial-')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/cu.usbserial-foo'}
mock_platform.assert_called()
mock_sp.assert_called()
@pytest.mark.unit
@patch('meshtastic.util.detect_windows_port', return_value={'COM2'})
@patch('platform.system', return_value='Windows')
def test_active_ports_on_supported_devices_win(mock_platform, mock_dwp):
"""Test active_ports_on_supported_devices()"""
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices) == {'COM2'}
mock_platform.assert_called()
mock_dwp.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Darwin')
def test_active_ports_on_supported_devices_mac_no_duplicates_check(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices, False) == {'/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441'}
mock_platform.assert_called()
mock_sp.assert_called()
@pytest.mark.unit
@patch('subprocess.getstatusoutput')
@patch('platform.system', return_value='Darwin')
def test_active_ports_on_supported_devices_mac_duplicates_check(mock_platform, mock_sp):
"""Test active_ports_on_supported_devices()"""
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
fake_supported_devices = [fake_device]
assert active_ports_on_supported_devices(fake_supported_devices, True) == {'/dev/cu.wchusbserial53230051441'}
mock_platform.assert_called()
mock_sp.assert_called()

View File

@@ -14,7 +14,8 @@ import subprocess
import serial
import serial.tools.list_ports
import pkg_resources
from meshtastic.supported_device import get_unique_vendor_ids, get_devices_with_vendor_id
from meshtastic.supported_device import supported_devices
"""Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366])
@@ -401,6 +402,7 @@ def eliminate_duplicate_port(ports):
if len(ports) != 2:
new_ports = ports
else:
ports.sort()
if 'usbserial' in ports[0] and 'wchusbserial' in ports[1]:
first = ports[0].replace("usbserial-", "")
second = ports[1].replace("wchusbserial", "")
@@ -432,3 +434,112 @@ def is_windows11():
except Exception as e:
print(f'problem detecting win11 e:{e}')
return is_win11
def get_unique_vendor_ids():
"""Return a set of unique vendor ids"""
vids = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex:
vids.add(d.usb_vendor_id_in_hex)
return vids
def get_devices_with_vendor_id(vid):
"""Return a set of unique devices with the vendor id"""
sd = set()
for d in supported_devices:
if d.usb_vendor_id_in_hex == vid:
sd.add(d)
return sd
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
"""Return a set of active ports based on the supplied supported devices"""
ports = set()
baseports = set()
system = platform.system()
# figure out what possible base ports there are
for d in sds:
if system == "Linux":
baseports.add(d.baseport_on_linux)
elif system == "Darwin":
baseports.add(d.baseport_on_mac)
elif system == "Windows":
baseports.add(d.baseport_on_windows)
for bp in baseports:
if system == "Linux":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Darwin":
# see if we have any devices (ignoring any stderr output)
command = f'ls -al /dev/{bp}* 2> /dev/null'
#print(f'command:{command}')
_, ls_output = subprocess.getstatusoutput(command)
#print(f'ls_output:{ls_output}')
# if we got output, there are ports
if len(ls_output) > 0:
#print('got output')
# for each line of output
lines = ls_output.split('\n')
#print(f'lines:{lines}')
for line in lines:
parts = line.split(' ')
#print(f'parts:{parts}')
port = parts[-1]
#print(f'port:{port}')
ports.add(port)
elif system == "Windows":
# for each device in supported devices found
for d in sds:
# find the port(s)
com_ports = detect_windows_port(d)
#print(f'com_ports:{com_ports}')
# add all ports
for com_port in com_ports:
ports.add(com_port)
if eliminate_duplicates:
ports = eliminate_duplicate_port(list(ports))
ports.sort()
ports = set(ports)
return ports
def detect_windows_port(sd):
"""detect if Windows port"""
ports = set()
if sd:
system = platform.system()
if system == "Windows":
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
command += ')} | Format-List"'
#print(f'command:{command}')
_, sp_output = subprocess.getstatusoutput(command)
#print(f'sp_output:{sp_output}')
p = re.compile(r'\(COM(.*)\)')
for x in p.findall(sp_output):
#print(f'x:{x}')
ports.add(f'COM{x}')
return ports

2
proto

Submodule proto updated: 30e147a55c...c851209e0b

View File

@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work
setup(
name="meshtastic",
version="1.2.86",
version="1.2.94",
description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description,
long_description_content_type="text/markdown",