mirror of
https://github.com/meshtastic/python.git
synced 2025-12-25 17:07:53 -05:00
Compare commits
29 Commits
1.3alpha.3
...
1.3alpha.8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6dc4d0bd2 | ||
|
|
8ec5dbbf38 | ||
|
|
0c760f3721 | ||
|
|
c45568731f | ||
|
|
ef4c9dc338 | ||
|
|
89de553aba | ||
|
|
247e7b2605 | ||
|
|
f81ba64b91 | ||
|
|
d6fbca1bf1 | ||
|
|
6fbf78fa19 | ||
|
|
e38a614c37 | ||
|
|
d8b9665946 | ||
|
|
e655beb01c | ||
|
|
aa1dcb7f99 | ||
|
|
1a2519d647 | ||
|
|
a3572efaa6 | ||
|
|
e28f0d5509 | ||
|
|
789a14054f | ||
|
|
4655b5c732 | ||
|
|
81ebd8c8e7 | ||
|
|
7998520e4a | ||
|
|
460196a62b | ||
|
|
22f43851bd | ||
|
|
91bb63eb97 | ||
|
|
ede1b5f08b | ||
|
|
a26c157c65 | ||
|
|
bcf00a1f8d | ||
|
|
749a94e6e4 | ||
|
|
fc9d1e077c |
18
.github/workflows/release.yml
vendored
18
.github/workflows/release.yml
vendored
@@ -8,6 +8,7 @@ jobs:
|
||||
outputs:
|
||||
version: ${{ steps.get_version.outputs.version }}
|
||||
upload_url: ${{ steps.create_release.outputs.upload_url }}
|
||||
new_sha: ${{ steps.commit_updated.outputs.sha }}
|
||||
|
||||
steps:
|
||||
|
||||
@@ -19,12 +20,14 @@ jobs:
|
||||
bin/bump_version.py
|
||||
|
||||
- name: Commit updated version.py
|
||||
id: commit_updated
|
||||
run: |
|
||||
git config --global user.name 'github-actions'
|
||||
git config --global user.email 'bot@noreply.github.com'
|
||||
git remote set-url origin https://x-access-token:${{ secrets.GITHUB_TOKEN }}@github.com/${{ github.repository }}
|
||||
git add setup.py
|
||||
git commit -m "bump version" && git push || echo "No changes to commit"
|
||||
git log -n 1 --pretty=format:"%H" | tail -n 1 | awk '{print "::set-output name=sha::"$0}'
|
||||
|
||||
- name: Get version
|
||||
id: get_version
|
||||
@@ -45,9 +48,6 @@ jobs:
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
@@ -82,7 +82,9 @@ jobs:
|
||||
steps:
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ needs.release_create.outputs.new_sha }}
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
@@ -127,7 +129,9 @@ jobs:
|
||||
steps:
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ needs.release_create.outputs.new_sha }}
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
@@ -167,7 +171,9 @@ jobs:
|
||||
steps:
|
||||
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
uses: actions/checkout@v3
|
||||
with:
|
||||
ref: ${{ needs.release_create.outputs.new_sha }}
|
||||
|
||||
- name: Set up Python 3.9
|
||||
uses: actions/setup-python@v2
|
||||
|
||||
@@ -3,8 +3,7 @@
|
||||
"""
|
||||
|
||||
import sys
|
||||
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices
|
||||
from meshtastic.util import detect_supported_devices
|
||||
from meshtastic.util import detect_supported_devices, get_unique_vendor_ids, active_ports_on_supported_devices
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) != 1:
|
||||
|
||||
6
examples/show_ports.py
Normal file
6
examples/show_ports.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""Simple program to show serial ports.
|
||||
"""
|
||||
|
||||
from meshtastic.util import findPorts
|
||||
|
||||
print(findPorts())
|
||||
@@ -139,11 +139,22 @@ def setPref(attributes, name, valStr):
|
||||
for temp_name in sorted(names):
|
||||
print(f" {temp_name}")
|
||||
return
|
||||
try:
|
||||
setattr(attributes, snake_name, val)
|
||||
except TypeError:
|
||||
# The setter didn't like our arg type guess try again as a string
|
||||
setattr(attributes, snake_name, valStr)
|
||||
|
||||
# note: 'ignore_incoming' is a repeating field
|
||||
if snake_name != 'ignore_incoming':
|
||||
try:
|
||||
setattr(attributes, snake_name, val)
|
||||
except TypeError:
|
||||
# The setter didn't like our arg type guess try again as a string
|
||||
setattr(attributes, snake_name, valStr)
|
||||
else:
|
||||
if val == 0:
|
||||
# clear values
|
||||
print("Clearing ignore_incoming list")
|
||||
del attributes.ignore_incoming[:]
|
||||
else:
|
||||
print(f"Adding '{val}' to the ignore_incoming list")
|
||||
attributes.ignore_incoming.extend([val])
|
||||
|
||||
if Globals.getInstance().get_camel_case():
|
||||
print(f"Set {camel_name} to {valStr}")
|
||||
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -2,10 +2,6 @@
|
||||
It is used for auto detection as to which device might be connected.
|
||||
"""
|
||||
|
||||
import platform
|
||||
import subprocess
|
||||
import re
|
||||
|
||||
# Goal is to detect which device and port to use from the supported devices
|
||||
# without installing any libraries that are not currently in the python meshtastic library
|
||||
|
||||
@@ -42,16 +38,13 @@ tbeam_M8N = SupportedDevice(name="T-Beam", version="M8N", for_firmware="tbeam",
|
||||
tbeam_M8N_SX1262 = SupportedDevice(name="T-Beam", version="M8N_SX1262", for_firmware="tbeam",
|
||||
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
|
||||
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
|
||||
tlora_v1_1 = SupportedDevice(name="T-Lora", version="1.1", for_firmware="tlora-v1",
|
||||
tlora_v1 = SupportedDevice(name="T-Lora", version="1", for_firmware="tlora-v1",
|
||||
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial",
|
||||
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
|
||||
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
|
||||
tlora_v1_3 = SupportedDevice(name="T-Lora", version="1.3", for_firmware="tlora-v1-3",
|
||||
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial",
|
||||
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
|
||||
tlora_v2_0 = SupportedDevice(name="T-Lora", version="2.0", for_firmware="tlora-v2-1",
|
||||
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
|
||||
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
|
||||
tlora_v2_1 = SupportedDevice(name="T-Lora", version="2.1", for_firmware="tlora-v2-1",
|
||||
tlora_v2 = SupportedDevice(name="T-Lora", version="2", for_firmware="tlora-v2",
|
||||
baseport_on_linux="ttyACM", baseport_on_mac="cu.usbmodem",
|
||||
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="55d4")
|
||||
tlora_v2_1_1_6 = SupportedDevice(name="T-Lora", version="2.1-1.6", for_firmware="tlora-v2-1-1.6",
|
||||
@@ -87,110 +80,7 @@ rak4631_19003 = SupportedDevice(name="RAK 4631 19003", version="", for_firmware=
|
||||
usb_vendor_id_in_hex="239a", usb_product_id_in_hex="8029")
|
||||
|
||||
supported_devices = [tbeam_v0_7, tbeam_v1_1, tbeam_M8N, tbeam_M8N_SX1262,
|
||||
tlora_v1_1, tlora_v1_3, tlora_v2_0, tlora_v2_1, tlora_v2_1_1_6,
|
||||
tlora_v1, tlora_v1_3, tlora_v2, tlora_v2_1_1_6,
|
||||
heltec_v1, heltec_v2_0, heltec_v2_1,
|
||||
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
|
||||
rak11200]
|
||||
|
||||
|
||||
def get_unique_vendor_ids():
|
||||
"""Return a set of unique vendor ids"""
|
||||
vids = set()
|
||||
for d in supported_devices:
|
||||
if d.usb_vendor_id_in_hex:
|
||||
vids.add(d.usb_vendor_id_in_hex)
|
||||
return vids
|
||||
|
||||
def get_devices_with_vendor_id(vid):
|
||||
"""Return a set of unique devices with the vendor id"""
|
||||
sd = set()
|
||||
for d in supported_devices:
|
||||
if d.usb_vendor_id_in_hex == vid:
|
||||
sd.add(d)
|
||||
return sd
|
||||
|
||||
def active_ports_on_supported_devices(sds):
|
||||
"""Return a set of active ports based on the supplied supported devices"""
|
||||
ports = set()
|
||||
baseports = set()
|
||||
system = platform.system()
|
||||
|
||||
# figure out what possible base ports there are
|
||||
for d in sds:
|
||||
if system == "Linux":
|
||||
baseports.add(d.baseport_on_linux)
|
||||
elif system == "Darwin":
|
||||
baseports.add(d.baseport_on_mac)
|
||||
elif system == "Windows":
|
||||
baseports.add(d.baseport_on_windows)
|
||||
|
||||
for bp in baseports:
|
||||
if system == "Linux":
|
||||
# see if we have any devices (ignoring any stderr output)
|
||||
command = f'ls -al /dev/{bp}* 2> /dev/null'
|
||||
#print(f'command:{command}')
|
||||
_, ls_output = subprocess.getstatusoutput(command)
|
||||
#print(f'ls_output:{ls_output}')
|
||||
# if we got output, there are ports
|
||||
if len(ls_output) > 0:
|
||||
#print('got output')
|
||||
# for each line of output
|
||||
lines = ls_output.split('\n')
|
||||
#print(f'lines:{lines}')
|
||||
for line in lines:
|
||||
parts = line.split(' ')
|
||||
#print(f'parts:{parts}')
|
||||
port = parts[-1]
|
||||
#print(f'port:{port}')
|
||||
ports.add(port)
|
||||
elif system == "Darwin":
|
||||
# see if we have any devices (ignoring any stderr output)
|
||||
command = f'ls -al /dev/{bp}* 2> /dev/null'
|
||||
#print(f'command:{command}')
|
||||
_, ls_output = subprocess.getstatusoutput(command)
|
||||
#print(f'ls_output:{ls_output}')
|
||||
# if we got output, there are ports
|
||||
if len(ls_output) > 0:
|
||||
#print('got output')
|
||||
# for each line of output
|
||||
lines = ls_output.split('\n')
|
||||
#print(f'lines:{lines}')
|
||||
for line in lines:
|
||||
parts = line.split(' ')
|
||||
#print(f'parts:{parts}')
|
||||
port = parts[-1]
|
||||
#print(f'port:{port}')
|
||||
ports.add(port)
|
||||
elif system == "Windows":
|
||||
# for each device in supported devices found
|
||||
for d in sds:
|
||||
# find the port(s)
|
||||
com_ports = detect_windows_port(d)
|
||||
#print(f'com_ports:{com_ports}')
|
||||
# add all ports
|
||||
for com_port in com_ports:
|
||||
ports.add(com_port)
|
||||
return ports
|
||||
|
||||
|
||||
def detect_windows_port(sd):
|
||||
"""detect if Windows port"""
|
||||
ports = set()
|
||||
|
||||
if sd:
|
||||
system = platform.system()
|
||||
|
||||
if system == "Windows":
|
||||
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
|
||||
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
|
||||
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
|
||||
command += ')} | Format-List"'
|
||||
|
||||
#print(f'command:{command}')
|
||||
_, sp_output = subprocess.getstatusoutput(command)
|
||||
#print(f'sp_output:{sp_output}')
|
||||
p = re.compile(r'\(COM(.*)\)')
|
||||
for x in p.findall(sp_output):
|
||||
#print(f'x:{x}')
|
||||
ports.add(f'COM{x}')
|
||||
return ports
|
||||
|
||||
@@ -2331,6 +2331,54 @@ def test_main_setPref_invalid_field_camel(capsys):
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_main_setPref_ignore_incoming_123(capsys):
|
||||
"""Test setPref() with ignore_incoming"""
|
||||
|
||||
class Field:
|
||||
"""Simple class for testing."""
|
||||
|
||||
def __init__(self, name, enum_type):
|
||||
"""constructor"""
|
||||
self.name = name
|
||||
self.enum_type = enum_type
|
||||
|
||||
ignore_incoming_field = Field('ignore_incoming', 'list')
|
||||
prefs = MagicMock()
|
||||
prefs.DESCRIPTOR.fields_by_name.get.return_value = ignore_incoming_field
|
||||
|
||||
setPref(prefs, 'ignore_incoming', '123')
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r"Adding '123' to the ignore_incoming list", out, re.MULTILINE)
|
||||
assert re.search(r'Set ignore_incoming to 123', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_main_setPref_ignore_incoming_0(capsys):
|
||||
"""Test setPref() with ignore_incoming"""
|
||||
|
||||
class Field:
|
||||
"""Simple class for testing."""
|
||||
|
||||
def __init__(self, name, enum_type):
|
||||
"""constructor"""
|
||||
self.name = name
|
||||
self.enum_type = enum_type
|
||||
|
||||
ignore_incoming_field = Field('ignore_incoming', 'list')
|
||||
prefs = MagicMock()
|
||||
prefs.DESCRIPTOR.fields_by_name.get.return_value = ignore_incoming_field
|
||||
|
||||
setPref(prefs, 'ignore_incoming', '0')
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Clearing ignore_incoming list', out, re.MULTILINE)
|
||||
assert re.search(r'Set ignore_incoming to 0', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_main_ch_set_psk_no_ch_index(capsys):
|
||||
|
||||
@@ -12,7 +12,9 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
|
||||
remove_keys_from_dict, Timeout, hexstr,
|
||||
ipstr, readnet_u16, findPorts, convert_mac_addr,
|
||||
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
|
||||
is_windows11)
|
||||
is_windows11, active_ports_on_supported_devices)
|
||||
|
||||
from meshtastic.supported_device import SupportedDevice
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -264,6 +266,22 @@ def test_findPorts_when_duplicate_found_and_duplicate_option_used(patch_comports
|
||||
patch_comports.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('serial.tools.list_ports.comports')
|
||||
def test_findPorts_when_duplicate_found_and_duplicate_option_used_ports_reversed(patch_comports):
|
||||
"""Test findPorts()"""
|
||||
class TempPort:
|
||||
""" temp class for port"""
|
||||
def __init__(self, device=None, vid=None):
|
||||
self.device = device
|
||||
self.vid = vid
|
||||
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
|
||||
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
|
||||
patch_comports.return_value = [fake2, fake1]
|
||||
assert findPorts(eliminate_duplicates=True) == ['/dev/cu.wchusbserial1430']
|
||||
patch_comports.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('serial.tools.list_ports.comports')
|
||||
def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports):
|
||||
@@ -315,8 +333,13 @@ def test_eliminate_duplicate_port():
|
||||
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1']) == ['/dev/fake', '/dev/fake1']
|
||||
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1', '/dev/fake2']) == ['/dev/fake', '/dev/fake1', '/dev/fake2']
|
||||
assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430']
|
||||
assert eliminate_duplicate_port(['/dev/cu.wchusbserial1430', '/dev/cu.usbserial-1430']) == ['/dev/cu.wchusbserial1430']
|
||||
assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001']
|
||||
assert eliminate_duplicate_port(['/dev/cu.usbserial-0001', '/dev/cu.SLAB_USBtoUART']) == ['/dev/cu.usbserial-0001']
|
||||
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
|
||||
assert eliminate_duplicate_port(['/dev/cu.wchusbserial11301', '/dev/cu.usbmodem11301']) == ['/dev/cu.wchusbserial11301']
|
||||
assert eliminate_duplicate_port(['/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441']) == ['/dev/cu.wchusbserial53230051441']
|
||||
assert eliminate_duplicate_port(['/dev/cu.wchusbserial53230051441', '/dev/cu.usbmodem53230051441']) == ['/dev/cu.wchusbserial53230051441']
|
||||
|
||||
@patch('platform.version', return_value='10.0.22000.194')
|
||||
@patch('platform.release', return_value='10')
|
||||
@@ -358,3 +381,78 @@ def test_is_windows11_false_win8_1(patched_platform, patched_release):
|
||||
assert is_windows11() is False
|
||||
patched_platform.assert_called()
|
||||
patched_release.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('platform.system', return_value='Linux')
|
||||
def test_active_ports_on_supported_devices_empty(mock_platform):
|
||||
"""Test active_ports_on_supported_devices()"""
|
||||
sds = set()
|
||||
assert active_ports_on_supported_devices(sds) == set()
|
||||
mock_platform.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('subprocess.getstatusoutput')
|
||||
@patch('platform.system', return_value='Linux')
|
||||
def test_active_ports_on_supported_devices_linux(mock_platform, mock_sp):
|
||||
"""Test active_ports_on_supported_devices()"""
|
||||
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/ttyUSBfake')
|
||||
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='ttyUSB')
|
||||
fake_supported_devices = [fake_device]
|
||||
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/ttyUSBfake'}
|
||||
mock_platform.assert_called()
|
||||
mock_sp.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('subprocess.getstatusoutput')
|
||||
@patch('platform.system', return_value='Darwin')
|
||||
def test_active_ports_on_supported_devices_mac(mock_platform, mock_sp):
|
||||
"""Test active_ports_on_supported_devices()"""
|
||||
mock_sp.return_value = (None, 'crw-rw-rw- 1 root wheel 0x9000000 Feb 8 22:22 /dev/cu.usbserial-foo')
|
||||
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1', baseport_on_linux='cu.usbserial-')
|
||||
fake_supported_devices = [fake_device]
|
||||
assert active_ports_on_supported_devices(fake_supported_devices) == {'/dev/cu.usbserial-foo'}
|
||||
mock_platform.assert_called()
|
||||
mock_sp.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('meshtastic.util.detect_windows_port', return_value={'COM2'})
|
||||
@patch('platform.system', return_value='Windows')
|
||||
def test_active_ports_on_supported_devices_win(mock_platform, mock_dwp):
|
||||
"""Test active_ports_on_supported_devices()"""
|
||||
fake_device = SupportedDevice(name='a', for_firmware='heltec-v2.1')
|
||||
fake_supported_devices = [fake_device]
|
||||
assert active_ports_on_supported_devices(fake_supported_devices) == {'COM2'}
|
||||
mock_platform.assert_called()
|
||||
mock_dwp.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('subprocess.getstatusoutput')
|
||||
@patch('platform.system', return_value='Darwin')
|
||||
def test_active_ports_on_supported_devices_mac_no_duplicates_check(mock_platform, mock_sp):
|
||||
"""Test active_ports_on_supported_devices()"""
|
||||
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
|
||||
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
|
||||
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
|
||||
fake_supported_devices = [fake_device]
|
||||
assert active_ports_on_supported_devices(fake_supported_devices, False) == {'/dev/cu.usbmodem53230051441', '/dev/cu.wchusbserial53230051441'}
|
||||
mock_platform.assert_called()
|
||||
mock_sp.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('subprocess.getstatusoutput')
|
||||
@patch('platform.system', return_value='Darwin')
|
||||
def test_active_ports_on_supported_devices_mac_duplicates_check(mock_platform, mock_sp):
|
||||
"""Test active_ports_on_supported_devices()"""
|
||||
mock_sp.return_value = (None, ('crw-rw-rw- 1 root wheel 0x9000005 Mar 8 10:05 /dev/cu.usbmodem53230051441\n'
|
||||
'crw-rw-rw- 1 root wheel 0x9000003 Mar 8 10:06 /dev/cu.wchusbserial53230051441'))
|
||||
fake_device = SupportedDevice(name='a', for_firmware='tbeam', baseport_on_mac='cu.usbmodem')
|
||||
fake_supported_devices = [fake_device]
|
||||
assert active_ports_on_supported_devices(fake_supported_devices, True) == {'/dev/cu.wchusbserial53230051441'}
|
||||
mock_platform.assert_called()
|
||||
mock_sp.assert_called()
|
||||
|
||||
@@ -14,7 +14,8 @@ import subprocess
|
||||
import serial
|
||||
import serial.tools.list_ports
|
||||
import pkg_resources
|
||||
from meshtastic.supported_device import get_unique_vendor_ids, get_devices_with_vendor_id
|
||||
|
||||
from meshtastic.supported_device import supported_devices
|
||||
|
||||
"""Some devices such as a seger jlink we never want to accidentally open"""
|
||||
blacklistVids = dict.fromkeys([0x1366])
|
||||
@@ -401,6 +402,7 @@ def eliminate_duplicate_port(ports):
|
||||
if len(ports) != 2:
|
||||
new_ports = ports
|
||||
else:
|
||||
ports.sort()
|
||||
if 'usbserial' in ports[0] and 'wchusbserial' in ports[1]:
|
||||
first = ports[0].replace("usbserial-", "")
|
||||
second = ports[1].replace("wchusbserial", "")
|
||||
@@ -432,3 +434,112 @@ def is_windows11():
|
||||
except Exception as e:
|
||||
print(f'problem detecting win11 e:{e}')
|
||||
return is_win11
|
||||
|
||||
|
||||
def get_unique_vendor_ids():
|
||||
"""Return a set of unique vendor ids"""
|
||||
vids = set()
|
||||
for d in supported_devices:
|
||||
if d.usb_vendor_id_in_hex:
|
||||
vids.add(d.usb_vendor_id_in_hex)
|
||||
return vids
|
||||
|
||||
|
||||
def get_devices_with_vendor_id(vid):
|
||||
"""Return a set of unique devices with the vendor id"""
|
||||
sd = set()
|
||||
for d in supported_devices:
|
||||
if d.usb_vendor_id_in_hex == vid:
|
||||
sd.add(d)
|
||||
return sd
|
||||
|
||||
|
||||
def active_ports_on_supported_devices(sds, eliminate_duplicates=False):
|
||||
"""Return a set of active ports based on the supplied supported devices"""
|
||||
ports = set()
|
||||
baseports = set()
|
||||
system = platform.system()
|
||||
|
||||
# figure out what possible base ports there are
|
||||
for d in sds:
|
||||
if system == "Linux":
|
||||
baseports.add(d.baseport_on_linux)
|
||||
elif system == "Darwin":
|
||||
baseports.add(d.baseport_on_mac)
|
||||
elif system == "Windows":
|
||||
baseports.add(d.baseport_on_windows)
|
||||
|
||||
for bp in baseports:
|
||||
if system == "Linux":
|
||||
# see if we have any devices (ignoring any stderr output)
|
||||
command = f'ls -al /dev/{bp}* 2> /dev/null'
|
||||
#print(f'command:{command}')
|
||||
_, ls_output = subprocess.getstatusoutput(command)
|
||||
#print(f'ls_output:{ls_output}')
|
||||
# if we got output, there are ports
|
||||
if len(ls_output) > 0:
|
||||
#print('got output')
|
||||
# for each line of output
|
||||
lines = ls_output.split('\n')
|
||||
#print(f'lines:{lines}')
|
||||
for line in lines:
|
||||
parts = line.split(' ')
|
||||
#print(f'parts:{parts}')
|
||||
port = parts[-1]
|
||||
#print(f'port:{port}')
|
||||
ports.add(port)
|
||||
elif system == "Darwin":
|
||||
# see if we have any devices (ignoring any stderr output)
|
||||
command = f'ls -al /dev/{bp}* 2> /dev/null'
|
||||
#print(f'command:{command}')
|
||||
_, ls_output = subprocess.getstatusoutput(command)
|
||||
#print(f'ls_output:{ls_output}')
|
||||
# if we got output, there are ports
|
||||
if len(ls_output) > 0:
|
||||
#print('got output')
|
||||
# for each line of output
|
||||
lines = ls_output.split('\n')
|
||||
#print(f'lines:{lines}')
|
||||
for line in lines:
|
||||
parts = line.split(' ')
|
||||
#print(f'parts:{parts}')
|
||||
port = parts[-1]
|
||||
#print(f'port:{port}')
|
||||
ports.add(port)
|
||||
elif system == "Windows":
|
||||
# for each device in supported devices found
|
||||
for d in sds:
|
||||
# find the port(s)
|
||||
com_ports = detect_windows_port(d)
|
||||
#print(f'com_ports:{com_ports}')
|
||||
# add all ports
|
||||
for com_port in com_ports:
|
||||
ports.add(com_port)
|
||||
if eliminate_duplicates:
|
||||
ports = eliminate_duplicate_port(list(ports))
|
||||
ports.sort()
|
||||
ports = set(ports)
|
||||
return ports
|
||||
|
||||
|
||||
def detect_windows_port(sd):
|
||||
"""detect if Windows port"""
|
||||
ports = set()
|
||||
|
||||
if sd:
|
||||
system = platform.system()
|
||||
|
||||
if system == "Windows":
|
||||
command = ('powershell.exe "[Console]::OutputEncoding = [Text.UTF8Encoding]::UTF8;'
|
||||
'Get-PnpDevice -PresentOnly | Where-Object{ ($_.DeviceId -like ')
|
||||
command += f"'*{sd.usb_vendor_id_in_hex.upper()}*'"
|
||||
command += ')} | Format-List"'
|
||||
|
||||
#print(f'command:{command}')
|
||||
_, sp_output = subprocess.getstatusoutput(command)
|
||||
#print(f'sp_output:{sp_output}')
|
||||
p = re.compile(r'\(COM(.*)\)')
|
||||
for x in p.findall(sp_output):
|
||||
#print(f'x:{x}')
|
||||
ports.add(f'COM{x}')
|
||||
return ports
|
||||
|
||||
2
proto
2
proto
Submodule proto updated: 7c49bdad99...441303d531
2
setup.py
2
setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
|
||||
# This call to setup() does all the work
|
||||
setup(
|
||||
name="meshtastic",
|
||||
version="1.3alpha.2",
|
||||
version="1.3alpha.7",
|
||||
description="Python API & client shell for talking to Meshtastic devices",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
Reference in New Issue
Block a user