Compare commits

...

7 Commits

Author SHA1 Message Date
mkinney
4f7f38e0a7 Update setup.py 2022-02-18 11:16:49 -08:00
mkinney
85dca2e14e Merge pull request #278 from mkinney/fix_smoke1
add detection of duplicate ports to findPorts; fix smoke1 test
2022-02-18 11:16:25 -08:00
Mike Kinney
e53a5023f1 add detection of duplicate ports to findPorts; fix smoke1 test 2022-02-18 11:13:48 -08:00
mkinney
ce8b75d96d Merge pull request #277 from mkinney/win11_delay_only
only delay on win11
2022-02-18 10:12:16 -08:00
Mike Kinney
26f65c4fee only delay on win11 2022-02-18 10:08:53 -08:00
mkinney
1ba1e51ca4 Merge pull request #276 from mkinney/keep_devpath_on_serial
keep the devPath used on serial connections
2022-02-18 10:03:43 -08:00
Mike Kinney
f674afc412 keep the devPath used on serial connections 2022-02-18 09:59:09 -08:00
7 changed files with 118 additions and 22 deletions

View File

@@ -24,35 +24,33 @@ class SerialInterface(StreamInterface):
"""
self.noProto = noProto
if devPath is None:
ports = meshtastic.util.findPorts()
self.devPath = devPath
if self.devPath is None:
ports = meshtastic.util.findPorts(True)
logging.debug(f"ports:{ports}")
if len(ports) == 0:
meshtastic.util.our_exit("Warning: No Meshtastic devices detected.")
elif len(ports) > 1:
tmp_ports = meshtastic.util.eliminate_duplicate_port(ports)
if len(tmp_ports) != 1:
message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message += f" Ports detected:{ports}"
meshtastic.util.our_exit(message)
else:
devPath = tmp_ports[0]
message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
message += f" Ports detected:{ports}"
meshtastic.util.our_exit(message)
else:
devPath = ports[0]
self.devPath = ports[0]
logging.debug(f"Connecting to {devPath}")
logging.debug(f"Connecting to {self.devPath}")
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
# see https://github.com/pyserial/pyserial/issues/124
if platform.system() != 'Windows':
with open(devPath, encoding='utf8') as f:
with open(self.devPath, encoding='utf8') as f:
attrs = termios.tcgetattr(f)
attrs[2] = attrs[2] & ~termios.HUPCL
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
f.close()
time.sleep(0.1)
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
self.stream = serial.Serial(self.devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
self.stream.flush()
time.sleep(0.1)

View File

@@ -8,7 +8,7 @@ import serial
from meshtastic.mesh_interface import MeshInterface
from meshtastic.util import stripnl
from meshtastic.util import stripnl, is_windows11
START1 = 0x94
@@ -38,6 +38,8 @@ class StreamInterface(MeshInterface):
self._rxBuf = bytes() # empty
self._wantExit = False
self.is_windows11 = is_windows11()
# FIXME, figure out why daemon=True causes reader thread to exit too early
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
@@ -88,9 +90,12 @@ class StreamInterface(MeshInterface):
if self.stream: # ignore writes when stream is closed
self.stream.write(b)
self.stream.flush()
# we sleep here to give the TBeam a chance to work
# also win11 might need a bit more time, too
time.sleep(1.0)
# win11 might need a bit more time, too
if self.is_windows11:
time.sleep(1.0)
else:
# we sleep here to give the TBeam a chance to work
time.sleep(0.1)
def _readBytes(self, length):
"""Read an array of bytes from our stream"""

View File

@@ -149,7 +149,7 @@ def testAll(numTests=5):
This is called from the cli with the "--test" option.
"""
ports = meshtastic.util.findPorts()
ports = meshtastic.util.findPorts(True)
if len(ports) < 2:
meshtastic.util.our_exit("Warning: Must have at least two devices connected to USB.")

View File

@@ -160,7 +160,7 @@ def test_smoke1_send_hello():
def test_smoke1_port():
"""Test --port"""
# first, get the ports
ports = findPorts()
ports = findPorts(True)
# hopefully there is just one
assert len(ports) == 1
port = ports[0]

View File

@@ -11,7 +11,8 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
quoteBooleans, catchAndIgnore,
remove_keys_from_dict, Timeout, hexstr,
ipstr, readnet_u16, findPorts, convert_mac_addr,
snake_to_camel, camel_to_snake, eliminate_duplicate_port)
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
is_windows11)
@pytest.mark.unit
@@ -247,6 +248,38 @@ def test_findPorts_when_none_found(patch_comports):
patch_comports.assert_called()
@pytest.mark.unitslow
@patch('serial.tools.list_ports.comports')
def test_findPorts_when_duplicate_found_and_duplicate_option_used(patch_comports):
"""Test findPorts()"""
class TempPort:
""" temp class for port"""
def __init__(self, device=None, vid=None):
self.device = device
self.vid = vid
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
patch_comports.return_value = [fake1, fake2]
assert findPorts(eliminate_duplicates=True) == ['/dev/cu.wchusbserial1430']
patch_comports.assert_called()
@pytest.mark.unitslow
@patch('serial.tools.list_ports.comports')
def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports):
"""Test findPorts()"""
class TempPort:
""" temp class for port"""
def __init__(self, device=None, vid=None):
self.device = device
self.vid = vid
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
patch_comports.return_value = [fake1, fake2]
assert findPorts() == ['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']
patch_comports.assert_called()
@pytest.mark.unitslow
def test_convert_mac_addr():
"""Test convert_mac_addr()"""
@@ -284,3 +317,44 @@ def test_eliminate_duplicate_port():
assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430']
assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001']
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
@patch('platform.version', return_value='10.0.22000.194')
@patch('platform.release', return_value='10')
@patch('platform.system', return_value='Windows')
def test_is_windows11_true(patched_platform, patched_release, patched_version):
"""Test is_windows11()"""
assert is_windows11() is True
patched_platform.assert_called()
patched_release.assert_called()
patched_version.assert_called()
@patch('platform.version', return_value='10.0.a2200.foo') # made up
@patch('platform.release', return_value='10')
@patch('platform.system', return_value='Windows')
def test_is_windows11_true2(patched_platform, patched_release, patched_version):
"""Test is_windows11()"""
assert is_windows11() is False
patched_platform.assert_called()
patched_release.assert_called()
patched_version.assert_called()
@patch('platform.version', return_value='10.0.17763') # windows 10 home
@patch('platform.release', return_value='10')
@patch('platform.system', return_value='Windows')
def test_is_windows11_false(patched_platform, patched_release, patched_version):
"""Test is_windows11()"""
assert is_windows11() is False
patched_platform.assert_called()
patched_release.assert_called()
patched_version.assert_called()
@patch('platform.release', return_value='8.1')
@patch('platform.system', return_value='Windows')
def test_is_windows11_false_win8_1(patched_platform, patched_release):
"""Test is_windows11()"""
assert is_windows11() is False
patched_platform.assert_called()
patched_release.assert_called()

View File

@@ -113,8 +113,9 @@ def catchAndIgnore(reason, closure):
logging.error(f"Exception thrown in {reason}: {ex}")
def findPorts():
def findPorts(eliminate_duplicates=False):
"""Find all ports that might have meshtastic devices
eliminate_duplicates will run the eliminate_duplicate_port() on the collection
Returns:
list -- a list of device paths
@@ -123,6 +124,8 @@ def findPorts():
filter(lambda port: port.vid is not None and port.vid not in blacklistVids,
serial.tools.list_ports.comports())))
l.sort()
if eliminate_duplicates:
l = eliminate_duplicate_port(l)
return l
@@ -413,3 +416,19 @@ def eliminate_duplicate_port(ports):
else:
new_ports = ports
return new_ports
def is_windows11():
"""Detect if Windows 11"""
is_win11 = False
if platform.system() == "Windows":
if float(platform.release()) >= 10.0:
patch = platform.version().split('.')[2]
# in case they add some number suffix later, just get first 5 chars of patch
patch = patch[:5]
try:
if int(patch) >= 22000:
is_win11 = True
except Exception as e:
print(f'problem detecting win11 e:{e}')
return is_win11

View File

@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
# This call to setup() does all the work
setup(
name="meshtastic",
version="1.2.84",
version="1.2.85",
description="Python API & client shell for talking to Meshtastic devices",
long_description=long_description,
long_description_content_type="text/markdown",