mirror of
https://github.com/meshtastic/python.git
synced 2026-01-14 10:47:59 -05:00
Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f7f38e0a7 | ||
|
|
85dca2e14e | ||
|
|
e53a5023f1 | ||
|
|
ce8b75d96d | ||
|
|
26f65c4fee | ||
|
|
1ba1e51ca4 | ||
|
|
f674afc412 | ||
|
|
db90b898e1 | ||
|
|
71621c2225 | ||
|
|
ed36fca4a2 | ||
|
|
fdd3699ba5 | ||
|
|
7d4b39643b | ||
|
|
7698cd2c7d | ||
|
|
03c744df54 | ||
|
|
90978d1f35 | ||
|
|
68a2bf271a | ||
|
|
8301384c53 | ||
|
|
045592212a |
@@ -24,8 +24,10 @@ class SerialInterface(StreamInterface):
|
|||||||
"""
|
"""
|
||||||
self.noProto = noProto
|
self.noProto = noProto
|
||||||
|
|
||||||
if devPath is None:
|
self.devPath = devPath
|
||||||
ports = meshtastic.util.findPorts()
|
|
||||||
|
if self.devPath is None:
|
||||||
|
ports = meshtastic.util.findPorts(True)
|
||||||
logging.debug(f"ports:{ports}")
|
logging.debug(f"ports:{ports}")
|
||||||
if len(ports) == 0:
|
if len(ports) == 0:
|
||||||
meshtastic.util.our_exit("Warning: No Meshtastic devices detected.")
|
meshtastic.util.our_exit("Warning: No Meshtastic devices detected.")
|
||||||
@@ -34,21 +36,21 @@ class SerialInterface(StreamInterface):
|
|||||||
message += f" Ports detected:{ports}"
|
message += f" Ports detected:{ports}"
|
||||||
meshtastic.util.our_exit(message)
|
meshtastic.util.our_exit(message)
|
||||||
else:
|
else:
|
||||||
devPath = ports[0]
|
self.devPath = ports[0]
|
||||||
|
|
||||||
logging.debug(f"Connecting to {devPath}")
|
logging.debug(f"Connecting to {self.devPath}")
|
||||||
|
|
||||||
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
|
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
|
||||||
# see https://github.com/pyserial/pyserial/issues/124
|
# see https://github.com/pyserial/pyserial/issues/124
|
||||||
if platform.system() != 'Windows':
|
if platform.system() != 'Windows':
|
||||||
with open(devPath, encoding='utf8') as f:
|
with open(self.devPath, encoding='utf8') as f:
|
||||||
attrs = termios.tcgetattr(f)
|
attrs = termios.tcgetattr(f)
|
||||||
attrs[2] = attrs[2] & ~termios.HUPCL
|
attrs[2] = attrs[2] & ~termios.HUPCL
|
||||||
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
||||||
f.close()
|
f.close()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
|
self.stream = serial.Serial(self.devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
|
||||||
self.stream.flush()
|
self.stream.flush()
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ import serial
|
|||||||
|
|
||||||
|
|
||||||
from meshtastic.mesh_interface import MeshInterface
|
from meshtastic.mesh_interface import MeshInterface
|
||||||
from meshtastic.util import stripnl
|
from meshtastic.util import stripnl, is_windows11
|
||||||
|
|
||||||
|
|
||||||
START1 = 0x94
|
START1 = 0x94
|
||||||
@@ -38,6 +38,8 @@ class StreamInterface(MeshInterface):
|
|||||||
self._rxBuf = bytes() # empty
|
self._rxBuf = bytes() # empty
|
||||||
self._wantExit = False
|
self._wantExit = False
|
||||||
|
|
||||||
|
self.is_windows11 = is_windows11()
|
||||||
|
|
||||||
# FIXME, figure out why daemon=True causes reader thread to exit too early
|
# FIXME, figure out why daemon=True causes reader thread to exit too early
|
||||||
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
|
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
|
||||||
|
|
||||||
@@ -88,9 +90,12 @@ class StreamInterface(MeshInterface):
|
|||||||
if self.stream: # ignore writes when stream is closed
|
if self.stream: # ignore writes when stream is closed
|
||||||
self.stream.write(b)
|
self.stream.write(b)
|
||||||
self.stream.flush()
|
self.stream.flush()
|
||||||
# we sleep here to give the TBeam a chance to work
|
# win11 might need a bit more time, too
|
||||||
# also win11 might need a bit more time, too
|
if self.is_windows11:
|
||||||
time.sleep(1.0)
|
time.sleep(1.0)
|
||||||
|
else:
|
||||||
|
# we sleep here to give the TBeam a chance to work
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
def _readBytes(self, length):
|
def _readBytes(self, length):
|
||||||
"""Read an array of bytes from our stream"""
|
"""Read an array of bytes from our stream"""
|
||||||
|
|||||||
@@ -149,7 +149,7 @@ def testAll(numTests=5):
|
|||||||
This is called from the cli with the "--test" option.
|
This is called from the cli with the "--test" option.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
ports = meshtastic.util.findPorts()
|
ports = meshtastic.util.findPorts(True)
|
||||||
if len(ports) < 2:
|
if len(ports) < 2:
|
||||||
meshtastic.util.our_exit("Warning: Must have at least two devices connected to USB.")
|
meshtastic.util.our_exit("Warning: Must have at least two devices connected to USB.")
|
||||||
|
|
||||||
|
|||||||
@@ -160,7 +160,7 @@ def test_smoke1_send_hello():
|
|||||||
def test_smoke1_port():
|
def test_smoke1_port():
|
||||||
"""Test --port"""
|
"""Test --port"""
|
||||||
# first, get the ports
|
# first, get the ports
|
||||||
ports = findPorts()
|
ports = findPorts(True)
|
||||||
# hopefully there is just one
|
# hopefully there is just one
|
||||||
assert len(ports) == 1
|
assert len(ports) == 1
|
||||||
port = ports[0]
|
port = ports[0]
|
||||||
|
|||||||
@@ -11,7 +11,8 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
|
|||||||
quoteBooleans, catchAndIgnore,
|
quoteBooleans, catchAndIgnore,
|
||||||
remove_keys_from_dict, Timeout, hexstr,
|
remove_keys_from_dict, Timeout, hexstr,
|
||||||
ipstr, readnet_u16, findPorts, convert_mac_addr,
|
ipstr, readnet_u16, findPorts, convert_mac_addr,
|
||||||
snake_to_camel, camel_to_snake)
|
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
|
||||||
|
is_windows11)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.unit
|
@pytest.mark.unit
|
||||||
@@ -247,6 +248,38 @@ def test_findPorts_when_none_found(patch_comports):
|
|||||||
patch_comports.assert_called()
|
patch_comports.assert_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unitslow
|
||||||
|
@patch('serial.tools.list_ports.comports')
|
||||||
|
def test_findPorts_when_duplicate_found_and_duplicate_option_used(patch_comports):
|
||||||
|
"""Test findPorts()"""
|
||||||
|
class TempPort:
|
||||||
|
""" temp class for port"""
|
||||||
|
def __init__(self, device=None, vid=None):
|
||||||
|
self.device = device
|
||||||
|
self.vid = vid
|
||||||
|
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
|
||||||
|
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
|
||||||
|
patch_comports.return_value = [fake1, fake2]
|
||||||
|
assert findPorts(eliminate_duplicates=True) == ['/dev/cu.wchusbserial1430']
|
||||||
|
patch_comports.assert_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unitslow
|
||||||
|
@patch('serial.tools.list_ports.comports')
|
||||||
|
def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports):
|
||||||
|
"""Test findPorts()"""
|
||||||
|
class TempPort:
|
||||||
|
""" temp class for port"""
|
||||||
|
def __init__(self, device=None, vid=None):
|
||||||
|
self.device = device
|
||||||
|
self.vid = vid
|
||||||
|
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
|
||||||
|
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
|
||||||
|
patch_comports.return_value = [fake1, fake2]
|
||||||
|
assert findPorts() == ['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']
|
||||||
|
patch_comports.assert_called()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.unitslow
|
@pytest.mark.unitslow
|
||||||
def test_convert_mac_addr():
|
def test_convert_mac_addr():
|
||||||
"""Test convert_mac_addr()"""
|
"""Test convert_mac_addr()"""
|
||||||
@@ -272,3 +305,56 @@ def test_camel_to_snake():
|
|||||||
assert camel_to_snake('Foo') == 'foo'
|
assert camel_to_snake('Foo') == 'foo'
|
||||||
assert camel_to_snake('fooBar') == 'foo_bar'
|
assert camel_to_snake('fooBar') == 'foo_bar'
|
||||||
assert camel_to_snake('fooBarBaz') == 'foo_bar_baz'
|
assert camel_to_snake('fooBarBaz') == 'foo_bar_baz'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_eliminate_duplicate_port():
|
||||||
|
"""Test eliminate_duplicate_port()"""
|
||||||
|
assert not eliminate_duplicate_port([])
|
||||||
|
assert eliminate_duplicate_port(['/dev/fake']) == ['/dev/fake']
|
||||||
|
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1']) == ['/dev/fake', '/dev/fake1']
|
||||||
|
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1', '/dev/fake2']) == ['/dev/fake', '/dev/fake1', '/dev/fake2']
|
||||||
|
assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430']
|
||||||
|
assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001']
|
||||||
|
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
|
||||||
|
|
||||||
|
@patch('platform.version', return_value='10.0.22000.194')
|
||||||
|
@patch('platform.release', return_value='10')
|
||||||
|
@patch('platform.system', return_value='Windows')
|
||||||
|
def test_is_windows11_true(patched_platform, patched_release, patched_version):
|
||||||
|
"""Test is_windows11()"""
|
||||||
|
assert is_windows11() is True
|
||||||
|
patched_platform.assert_called()
|
||||||
|
patched_release.assert_called()
|
||||||
|
patched_version.assert_called()
|
||||||
|
|
||||||
|
|
||||||
|
@patch('platform.version', return_value='10.0.a2200.foo') # made up
|
||||||
|
@patch('platform.release', return_value='10')
|
||||||
|
@patch('platform.system', return_value='Windows')
|
||||||
|
def test_is_windows11_true2(patched_platform, patched_release, patched_version):
|
||||||
|
"""Test is_windows11()"""
|
||||||
|
assert is_windows11() is False
|
||||||
|
patched_platform.assert_called()
|
||||||
|
patched_release.assert_called()
|
||||||
|
patched_version.assert_called()
|
||||||
|
|
||||||
|
|
||||||
|
@patch('platform.version', return_value='10.0.17763') # windows 10 home
|
||||||
|
@patch('platform.release', return_value='10')
|
||||||
|
@patch('platform.system', return_value='Windows')
|
||||||
|
def test_is_windows11_false(patched_platform, patched_release, patched_version):
|
||||||
|
"""Test is_windows11()"""
|
||||||
|
assert is_windows11() is False
|
||||||
|
patched_platform.assert_called()
|
||||||
|
patched_release.assert_called()
|
||||||
|
patched_version.assert_called()
|
||||||
|
|
||||||
|
|
||||||
|
@patch('platform.release', return_value='8.1')
|
||||||
|
@patch('platform.system', return_value='Windows')
|
||||||
|
def test_is_windows11_false_win8_1(patched_platform, patched_release):
|
||||||
|
"""Test is_windows11()"""
|
||||||
|
assert is_windows11() is False
|
||||||
|
patched_platform.assert_called()
|
||||||
|
patched_release.assert_called()
|
||||||
|
|||||||
@@ -113,8 +113,9 @@ def catchAndIgnore(reason, closure):
|
|||||||
logging.error(f"Exception thrown in {reason}: {ex}")
|
logging.error(f"Exception thrown in {reason}: {ex}")
|
||||||
|
|
||||||
|
|
||||||
def findPorts():
|
def findPorts(eliminate_duplicates=False):
|
||||||
"""Find all ports that might have meshtastic devices
|
"""Find all ports that might have meshtastic devices
|
||||||
|
eliminate_duplicates will run the eliminate_duplicate_port() on the collection
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
list -- a list of device paths
|
list -- a list of device paths
|
||||||
@@ -123,6 +124,8 @@ def findPorts():
|
|||||||
filter(lambda port: port.vid is not None and port.vid not in blacklistVids,
|
filter(lambda port: port.vid is not None and port.vid not in blacklistVids,
|
||||||
serial.tools.list_ports.comports())))
|
serial.tools.list_ports.comports())))
|
||||||
l.sort()
|
l.sort()
|
||||||
|
if eliminate_duplicates:
|
||||||
|
l = eliminate_duplicate_port(l)
|
||||||
return l
|
return l
|
||||||
|
|
||||||
|
|
||||||
@@ -381,3 +384,51 @@ def detect_windows_needs_driver(sd, print_reason=False):
|
|||||||
if print_reason:
|
if print_reason:
|
||||||
print(sp_output)
|
print(sp_output)
|
||||||
return need_to_install_driver
|
return need_to_install_driver
|
||||||
|
|
||||||
|
|
||||||
|
def eliminate_duplicate_port(ports):
|
||||||
|
"""Sometimes we detect 2 serial ports, but we really only need to use one of the ports.
|
||||||
|
|
||||||
|
ports is a list of ports
|
||||||
|
return a list with a single port to use, if it meets the duplicate port conditions
|
||||||
|
|
||||||
|
examples:
|
||||||
|
Ports: ['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430'] => ['/dev/cu.wchusbserial1430']
|
||||||
|
Ports: ['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301'] => ['/dev/cu.wchusbserial11301']
|
||||||
|
Ports: ['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001'] => ['/dev/cu.usbserial-0001']
|
||||||
|
"""
|
||||||
|
new_ports = []
|
||||||
|
if len(ports) != 2:
|
||||||
|
new_ports = ports
|
||||||
|
else:
|
||||||
|
if 'usbserial' in ports[0] and 'wchusbserial' in ports[1]:
|
||||||
|
first = ports[0].replace("usbserial-", "")
|
||||||
|
second = ports[1].replace("wchusbserial", "")
|
||||||
|
if first == second:
|
||||||
|
new_ports.append(ports[1])
|
||||||
|
elif 'usbmodem' in ports[0] and 'wchusbserial' in ports[1]:
|
||||||
|
first = ports[0].replace("usbmodem", "")
|
||||||
|
second = ports[1].replace("wchusbserial", "")
|
||||||
|
if first == second:
|
||||||
|
new_ports.append(ports[1])
|
||||||
|
elif 'SLAB_USBtoUART' in ports[0] and 'usbserial' in ports[1]:
|
||||||
|
new_ports.append(ports[1])
|
||||||
|
else:
|
||||||
|
new_ports = ports
|
||||||
|
return new_ports
|
||||||
|
|
||||||
|
|
||||||
|
def is_windows11():
|
||||||
|
"""Detect if Windows 11"""
|
||||||
|
is_win11 = False
|
||||||
|
if platform.system() == "Windows":
|
||||||
|
if float(platform.release()) >= 10.0:
|
||||||
|
patch = platform.version().split('.')[2]
|
||||||
|
# in case they add some number suffix later, just get first 5 chars of patch
|
||||||
|
patch = patch[:5]
|
||||||
|
try:
|
||||||
|
if int(patch) >= 22000:
|
||||||
|
is_win11 = True
|
||||||
|
except Exception as e:
|
||||||
|
print(f'problem detecting win11 e:{e}')
|
||||||
|
return is_win11
|
||||||
|
|||||||
2
proto
2
proto
Submodule proto updated: 2930129e8e...6a66f8b1f8
2
setup.py
2
setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
|
|||||||
# This call to setup() does all the work
|
# This call to setup() does all the work
|
||||||
setup(
|
setup(
|
||||||
name="meshtastic",
|
name="meshtastic",
|
||||||
version="1.2.83",
|
version="1.2.85",
|
||||||
description="Python API & client shell for talking to Meshtastic devices",
|
description="Python API & client shell for talking to Meshtastic devices",
|
||||||
long_description=long_description,
|
long_description=long_description,
|
||||||
long_description_content_type="text/markdown",
|
long_description_content_type="text/markdown",
|
||||||
|
|||||||
Reference in New Issue
Block a user