mirror of
https://github.com/meshtastic/python.git
synced 2025-12-26 09:27:52 -05:00
Compare commits
25 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4f7f38e0a7 | ||
|
|
85dca2e14e | ||
|
|
e53a5023f1 | ||
|
|
ce8b75d96d | ||
|
|
26f65c4fee | ||
|
|
1ba1e51ca4 | ||
|
|
f674afc412 | ||
|
|
db90b898e1 | ||
|
|
71621c2225 | ||
|
|
ed36fca4a2 | ||
|
|
fdd3699ba5 | ||
|
|
7d4b39643b | ||
|
|
7698cd2c7d | ||
|
|
03c744df54 | ||
|
|
90978d1f35 | ||
|
|
68a2bf271a | ||
|
|
8301384c53 | ||
|
|
045592212a | ||
|
|
45d879e607 | ||
|
|
3e44ee1eba | ||
|
|
36c5fc8d3c | ||
|
|
38ceb85ad9 | ||
|
|
6d18d9226d | ||
|
|
6d5ed2129a | ||
|
|
5cfb6ffa11 |
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
@@ -35,7 +35,7 @@ jobs:
|
||||
which meshtastic
|
||||
meshtastic --version
|
||||
- name: Run pylint
|
||||
run: pylint meshtastic
|
||||
run: pylint meshtastic examples/
|
||||
- name: Run tests with pytest
|
||||
run: pytest --cov=meshtastic
|
||||
- name: Generate coverage report
|
||||
|
||||
2
Makefile
2
Makefile
@@ -16,7 +16,7 @@ docs:
|
||||
|
||||
# lint the codebase
|
||||
lint:
|
||||
pylint meshtastic
|
||||
pylint meshtastic examples
|
||||
|
||||
# show the slowest unit tests
|
||||
slow:
|
||||
|
||||
23
examples/info_example.py
Normal file
23
examples/info_example.py
Normal file
@@ -0,0 +1,23 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/info.py
|
||||
"""
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
|
||||
# call showInfo() just to ensure values are populated
|
||||
#info = iface.showInfo()
|
||||
|
||||
if iface.myInfo:
|
||||
#print(f'myInfo:{iface.myInfo}')
|
||||
print(f'firmware_version:{iface.myInfo.firmware_version}')
|
||||
|
||||
if iface.nodes:
|
||||
for n in iface.nodes.values():
|
||||
if n['num'] == iface.myInfo.my_node_num:
|
||||
print(n['user']['hwModel'])
|
||||
break
|
||||
|
||||
iface.close()
|
||||
@@ -13,7 +13,7 @@ if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} host")
|
||||
sys.exit(1)
|
||||
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC):
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument
|
||||
"""This is called when we (re)connect to the radio."""
|
||||
print(interface.myInfo)
|
||||
interface.close()
|
||||
|
||||
@@ -14,11 +14,11 @@ if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} host")
|
||||
sys.exit(1)
|
||||
|
||||
def onReceive(packet, interface):
|
||||
def onReceive(packet, interface): # pylint: disable=unused-argument
|
||||
"""called when a packet arrives"""
|
||||
print(f"Received: {packet}")
|
||||
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC):
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument
|
||||
"""called when we (re)connect to the radio"""
|
||||
# defaults to broadcast, specify a destination ID if you wish
|
||||
interface.sendText("hello mesh")
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
"""
|
||||
|
||||
import sys
|
||||
import meshtastic
|
||||
from meshtastic.supported_device import get_unique_vendor_ids, active_ports_on_supported_devices
|
||||
from meshtastic.util import detect_supported_devices
|
||||
|
||||
|
||||
52
info/mac/rak11200.txt
Normal file
52
info/mac/rak11200.txt
Normal file
@@ -0,0 +1,52 @@
|
||||
|
||||
|
||||
% ioreg -p IOUSB > /tmp/a
|
||||
# only a solid red light (pins GRND and BOOT0 jumpered)
|
||||
|
||||
% ioreg -p IOUSB > /tmp/b
|
||||
# solid red light and solid green light
|
||||
|
||||
% ioreg -p IOUSB > /tmp/c
|
||||
# nothing plugged in
|
||||
|
||||
|
||||
|
||||
% diff /tmp/a /tmp/c
|
||||
13c13
|
||||
< +-o AppleUSBXHCI Root Hub Simulation@14000000 <class AppleUSBRootHubDevice, id 0x100000589, registered, matched, active, busy 0 (15 ms), retain 11>
|
||||
---
|
||||
> +-o AppleUSBXHCI Root Hub Simulation@14000000 <class AppleUSBRootHubDevice, id 0x100000589, registered, matched, active, busy 0 (15 ms), retain 10>
|
||||
18d17
|
||||
< +-o USB Serial@14300000 <class AppleUSBDevice, id 0x100004ca4, registered, matched, active, busy 0 (10 ms), retain 12>
|
||||
|
||||
|
||||
% diff /tmp/b /tmp/c
|
||||
13c13
|
||||
< +-o AppleUSBXHCI Root Hub Simulation@14000000 <class AppleUSBRootHubDevice, id 0x100000589, registered, matched, active, busy 0 (15 ms), retain 11>
|
||||
---
|
||||
> +-o AppleUSBXHCI Root Hub Simulation@14000000 <class AppleUSBRootHubDevice, id 0x100000589, registered, matched, active, busy 0 (15 ms), retain 10>
|
||||
18d17
|
||||
< +-o USB Serial@14300000 <class AppleUSBDevice, id 0x100004ce5, registered, matched, active, busy 0 (11 ms), retain 12>
|
||||
|
||||
|
||||
|
||||
system_profiler SPUSBDataType > /tmp/d
|
||||
# red solid
|
||||
|
||||
|
||||
system_profiler SPUSBDataType > /tmp/e
|
||||
# nothing
|
||||
|
||||
|
||||
% diff /tmp/d /tmp/e
|
||||
38,48d37
|
||||
< USB Serial:
|
||||
<
|
||||
< Product ID: 0x7523
|
||||
< Vendor ID: 0x1a86
|
||||
< Version: 2.64
|
||||
< Speed: Up to 12 Mb/s
|
||||
< Location ID: 0x14300000 / 33
|
||||
< Current Available (mA): 500
|
||||
< Current Required (mA): 98
|
||||
< Extra Operating Current (mA): 0
|
||||
@@ -30,3 +30,11 @@ drwxrwxrwx 1 bob staff 512 Feb 1 16:47 .fseventsd
|
||||
% diff /tmp/a /tmp/c
|
||||
<snip>
|
||||
> | +-o T-Echo v1@14300000 <class AppleUSBDevice, id 0x100061000, registered, matched, active, busy 0 (25 ms), retain 16>
|
||||
|
||||
contents of: INFO_UF2.TXT
|
||||
|
||||
UF2 Bootloader 0.6.1-2-g1224915 lib/nrfx (v2.0.0) lib/tinyusb (0.10.1-293-gaf8e5a90) lib/uf2 (remotes/origin/configupdate-9-gadbb8c7)
|
||||
Model: LilyGo T-Echo
|
||||
Board-ID: nRF52840-TEcho-v1
|
||||
SoftDevice: S140 version 6.1.1
|
||||
Date: Oct 13 2021
|
||||
|
||||
@@ -24,8 +24,10 @@ class SerialInterface(StreamInterface):
|
||||
"""
|
||||
self.noProto = noProto
|
||||
|
||||
if devPath is None:
|
||||
ports = meshtastic.util.findPorts()
|
||||
self.devPath = devPath
|
||||
|
||||
if self.devPath is None:
|
||||
ports = meshtastic.util.findPorts(True)
|
||||
logging.debug(f"ports:{ports}")
|
||||
if len(ports) == 0:
|
||||
meshtastic.util.our_exit("Warning: No Meshtastic devices detected.")
|
||||
@@ -34,21 +36,21 @@ class SerialInterface(StreamInterface):
|
||||
message += f" Ports detected:{ports}"
|
||||
meshtastic.util.our_exit(message)
|
||||
else:
|
||||
devPath = ports[0]
|
||||
self.devPath = ports[0]
|
||||
|
||||
logging.debug(f"Connecting to {devPath}")
|
||||
logging.debug(f"Connecting to {self.devPath}")
|
||||
|
||||
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
|
||||
# see https://github.com/pyserial/pyserial/issues/124
|
||||
if platform.system() != 'Windows':
|
||||
with open(devPath, encoding='utf8') as f:
|
||||
with open(self.devPath, encoding='utf8') as f:
|
||||
attrs = termios.tcgetattr(f)
|
||||
attrs[2] = attrs[2] & ~termios.HUPCL
|
||||
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
||||
f.close()
|
||||
time.sleep(0.1)
|
||||
|
||||
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
|
||||
self.stream = serial.Serial(self.devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ import serial
|
||||
|
||||
|
||||
from meshtastic.mesh_interface import MeshInterface
|
||||
from meshtastic.util import stripnl
|
||||
from meshtastic.util import stripnl, is_windows11
|
||||
|
||||
|
||||
START1 = 0x94
|
||||
@@ -38,6 +38,8 @@ class StreamInterface(MeshInterface):
|
||||
self._rxBuf = bytes() # empty
|
||||
self._wantExit = False
|
||||
|
||||
self.is_windows11 = is_windows11()
|
||||
|
||||
# FIXME, figure out why daemon=True causes reader thread to exit too early
|
||||
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
|
||||
|
||||
@@ -88,8 +90,12 @@ class StreamInterface(MeshInterface):
|
||||
if self.stream: # ignore writes when stream is closed
|
||||
self.stream.write(b)
|
||||
self.stream.flush()
|
||||
# we sleep here to give the TBeam a chance to work
|
||||
time.sleep(0.1)
|
||||
# win11 might need a bit more time, too
|
||||
if self.is_windows11:
|
||||
time.sleep(1.0)
|
||||
else:
|
||||
# we sleep here to give the TBeam a chance to work
|
||||
time.sleep(0.1)
|
||||
|
||||
def _readBytes(self, length):
|
||||
"""Read an array of bytes from our stream"""
|
||||
|
||||
@@ -66,6 +66,9 @@ heltec_v2_0 = SupportedDevice(name="Heltec", version="2.0", for_firmware="heltec
|
||||
heltec_v2_1 = SupportedDevice(name="Heltec", version="2.1", for_firmware="heltec-v2.1",
|
||||
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial-",
|
||||
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
|
||||
rak11200 = SupportedDevice(name="RAK 11200", version="", for_firmware="rak11200",
|
||||
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial-",
|
||||
usb_vendor_id_in_hex="1a86", usb_product_id_in_hex="7523")
|
||||
meshtastic_diy_v1 = SupportedDevice(name="Meshtastic DIY", version="1", for_firmware="meshtastic-diy-v1",
|
||||
baseport_on_linux="ttyUSB", baseport_on_mac="cu.usbserial-",
|
||||
usb_vendor_id_in_hex="10c4", usb_product_id_in_hex="ea60")
|
||||
@@ -86,7 +89,8 @@ rak4631_19003 = SupportedDevice(name="RAK 4631 19003", version="", for_firmware=
|
||||
supported_devices = [tbeam_v0_7, tbeam_v1_1, tbeam_M8N, tbeam_M8N_SX1262,
|
||||
tlora_v1_1, tlora_v1_3, tlora_v2_0, tlora_v2_1, tlora_v2_1_1_6,
|
||||
heltec_v1, heltec_v2_0, heltec_v2_1,
|
||||
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003]
|
||||
meshtastic_diy_v1, techo_1, rak4631_5005, rak4631_19003,
|
||||
rak11200]
|
||||
|
||||
|
||||
def get_unique_vendor_ids():
|
||||
|
||||
@@ -149,7 +149,7 @@ def testAll(numTests=5):
|
||||
This is called from the cli with the "--test" option.
|
||||
|
||||
"""
|
||||
ports = meshtastic.util.findPorts()
|
||||
ports = meshtastic.util.findPorts(True)
|
||||
if len(ports) < 2:
|
||||
meshtastic.util.our_exit("Warning: Must have at least two devices connected to USB.")
|
||||
|
||||
|
||||
@@ -160,7 +160,7 @@ def test_smoke1_send_hello():
|
||||
def test_smoke1_port():
|
||||
"""Test --port"""
|
||||
# first, get the ports
|
||||
ports = findPorts()
|
||||
ports = findPorts(True)
|
||||
# hopefully there is just one
|
||||
assert len(ports) == 1
|
||||
port = ports[0]
|
||||
|
||||
@@ -11,7 +11,8 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
|
||||
quoteBooleans, catchAndIgnore,
|
||||
remove_keys_from_dict, Timeout, hexstr,
|
||||
ipstr, readnet_u16, findPorts, convert_mac_addr,
|
||||
snake_to_camel, camel_to_snake)
|
||||
snake_to_camel, camel_to_snake, eliminate_duplicate_port,
|
||||
is_windows11)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -247,6 +248,38 @@ def test_findPorts_when_none_found(patch_comports):
|
||||
patch_comports.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('serial.tools.list_ports.comports')
|
||||
def test_findPorts_when_duplicate_found_and_duplicate_option_used(patch_comports):
|
||||
"""Test findPorts()"""
|
||||
class TempPort:
|
||||
""" temp class for port"""
|
||||
def __init__(self, device=None, vid=None):
|
||||
self.device = device
|
||||
self.vid = vid
|
||||
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
|
||||
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
|
||||
patch_comports.return_value = [fake1, fake2]
|
||||
assert findPorts(eliminate_duplicates=True) == ['/dev/cu.wchusbserial1430']
|
||||
patch_comports.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('serial.tools.list_ports.comports')
|
||||
def test_findPorts_when_duplicate_found_and_duplicate_option_not_used(patch_comports):
|
||||
"""Test findPorts()"""
|
||||
class TempPort:
|
||||
""" temp class for port"""
|
||||
def __init__(self, device=None, vid=None):
|
||||
self.device = device
|
||||
self.vid = vid
|
||||
fake1 = TempPort('/dev/cu.usbserial-1430', vid='fake1')
|
||||
fake2 = TempPort('/dev/cu.wchusbserial1430', vid='fake2')
|
||||
patch_comports.return_value = [fake1, fake2]
|
||||
assert findPorts() == ['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']
|
||||
patch_comports.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
def test_convert_mac_addr():
|
||||
"""Test convert_mac_addr()"""
|
||||
@@ -272,3 +305,56 @@ def test_camel_to_snake():
|
||||
assert camel_to_snake('Foo') == 'foo'
|
||||
assert camel_to_snake('fooBar') == 'foo_bar'
|
||||
assert camel_to_snake('fooBarBaz') == 'foo_bar_baz'
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_eliminate_duplicate_port():
|
||||
"""Test eliminate_duplicate_port()"""
|
||||
assert not eliminate_duplicate_port([])
|
||||
assert eliminate_duplicate_port(['/dev/fake']) == ['/dev/fake']
|
||||
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1']) == ['/dev/fake', '/dev/fake1']
|
||||
assert eliminate_duplicate_port(['/dev/fake', '/dev/fake1', '/dev/fake2']) == ['/dev/fake', '/dev/fake1', '/dev/fake2']
|
||||
assert eliminate_duplicate_port(['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430']) == ['/dev/cu.wchusbserial1430']
|
||||
assert eliminate_duplicate_port(['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001']) == ['/dev/cu.usbserial-0001']
|
||||
assert eliminate_duplicate_port(['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301']) == ['/dev/cu.wchusbserial11301']
|
||||
|
||||
@patch('platform.version', return_value='10.0.22000.194')
|
||||
@patch('platform.release', return_value='10')
|
||||
@patch('platform.system', return_value='Windows')
|
||||
def test_is_windows11_true(patched_platform, patched_release, patched_version):
|
||||
"""Test is_windows11()"""
|
||||
assert is_windows11() is True
|
||||
patched_platform.assert_called()
|
||||
patched_release.assert_called()
|
||||
patched_version.assert_called()
|
||||
|
||||
|
||||
@patch('platform.version', return_value='10.0.a2200.foo') # made up
|
||||
@patch('platform.release', return_value='10')
|
||||
@patch('platform.system', return_value='Windows')
|
||||
def test_is_windows11_true2(patched_platform, patched_release, patched_version):
|
||||
"""Test is_windows11()"""
|
||||
assert is_windows11() is False
|
||||
patched_platform.assert_called()
|
||||
patched_release.assert_called()
|
||||
patched_version.assert_called()
|
||||
|
||||
|
||||
@patch('platform.version', return_value='10.0.17763') # windows 10 home
|
||||
@patch('platform.release', return_value='10')
|
||||
@patch('platform.system', return_value='Windows')
|
||||
def test_is_windows11_false(patched_platform, patched_release, patched_version):
|
||||
"""Test is_windows11()"""
|
||||
assert is_windows11() is False
|
||||
patched_platform.assert_called()
|
||||
patched_release.assert_called()
|
||||
patched_version.assert_called()
|
||||
|
||||
|
||||
@patch('platform.release', return_value='8.1')
|
||||
@patch('platform.system', return_value='Windows')
|
||||
def test_is_windows11_false_win8_1(patched_platform, patched_release):
|
||||
"""Test is_windows11()"""
|
||||
assert is_windows11() is False
|
||||
patched_platform.assert_called()
|
||||
patched_release.assert_called()
|
||||
|
||||
@@ -113,8 +113,9 @@ def catchAndIgnore(reason, closure):
|
||||
logging.error(f"Exception thrown in {reason}: {ex}")
|
||||
|
||||
|
||||
def findPorts():
|
||||
def findPorts(eliminate_duplicates=False):
|
||||
"""Find all ports that might have meshtastic devices
|
||||
eliminate_duplicates will run the eliminate_duplicate_port() on the collection
|
||||
|
||||
Returns:
|
||||
list -- a list of device paths
|
||||
@@ -123,6 +124,8 @@ def findPorts():
|
||||
filter(lambda port: port.vid is not None and port.vid not in blacklistVids,
|
||||
serial.tools.list_ports.comports())))
|
||||
l.sort()
|
||||
if eliminate_duplicates:
|
||||
l = eliminate_duplicate_port(l)
|
||||
return l
|
||||
|
||||
|
||||
@@ -381,3 +384,51 @@ def detect_windows_needs_driver(sd, print_reason=False):
|
||||
if print_reason:
|
||||
print(sp_output)
|
||||
return need_to_install_driver
|
||||
|
||||
|
||||
def eliminate_duplicate_port(ports):
|
||||
"""Sometimes we detect 2 serial ports, but we really only need to use one of the ports.
|
||||
|
||||
ports is a list of ports
|
||||
return a list with a single port to use, if it meets the duplicate port conditions
|
||||
|
||||
examples:
|
||||
Ports: ['/dev/cu.usbserial-1430', '/dev/cu.wchusbserial1430'] => ['/dev/cu.wchusbserial1430']
|
||||
Ports: ['/dev/cu.usbmodem11301', '/dev/cu.wchusbserial11301'] => ['/dev/cu.wchusbserial11301']
|
||||
Ports: ['/dev/cu.SLAB_USBtoUART', '/dev/cu.usbserial-0001'] => ['/dev/cu.usbserial-0001']
|
||||
"""
|
||||
new_ports = []
|
||||
if len(ports) != 2:
|
||||
new_ports = ports
|
||||
else:
|
||||
if 'usbserial' in ports[0] and 'wchusbserial' in ports[1]:
|
||||
first = ports[0].replace("usbserial-", "")
|
||||
second = ports[1].replace("wchusbserial", "")
|
||||
if first == second:
|
||||
new_ports.append(ports[1])
|
||||
elif 'usbmodem' in ports[0] and 'wchusbserial' in ports[1]:
|
||||
first = ports[0].replace("usbmodem", "")
|
||||
second = ports[1].replace("wchusbserial", "")
|
||||
if first == second:
|
||||
new_ports.append(ports[1])
|
||||
elif 'SLAB_USBtoUART' in ports[0] and 'usbserial' in ports[1]:
|
||||
new_ports.append(ports[1])
|
||||
else:
|
||||
new_ports = ports
|
||||
return new_ports
|
||||
|
||||
|
||||
def is_windows11():
|
||||
"""Detect if Windows 11"""
|
||||
is_win11 = False
|
||||
if platform.system() == "Windows":
|
||||
if float(platform.release()) >= 10.0:
|
||||
patch = platform.version().split('.')[2]
|
||||
# in case they add some number suffix later, just get first 5 chars of patch
|
||||
patch = patch[:5]
|
||||
try:
|
||||
if int(patch) >= 22000:
|
||||
is_win11 = True
|
||||
except Exception as e:
|
||||
print(f'problem detecting win11 e:{e}')
|
||||
return is_win11
|
||||
|
||||
2
proto
2
proto
Submodule proto updated: 2930129e8e...6a66f8b1f8
2
setup.py
2
setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
|
||||
# This call to setup() does all the work
|
||||
setup(
|
||||
name="meshtastic",
|
||||
version="1.2.81",
|
||||
version="1.2.85",
|
||||
description="Python API & client shell for talking to Meshtastic devices",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
|
||||
Reference in New Issue
Block a user