mirror of
https://github.com/meshtastic/python.git
synced 2025-12-26 01:17:51 -05:00
Compare commits
38 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aff3bdd78e | ||
|
|
e9a8e26e76 | ||
|
|
83439679c1 | ||
|
|
968027a439 | ||
|
|
cc24b6ebc5 | ||
|
|
db09b4718d | ||
|
|
e0edbc6288 | ||
|
|
ae9ae91af5 | ||
|
|
7921db007b | ||
|
|
e85af2f9e9 | ||
|
|
8ccc64f92e | ||
|
|
afb21c6dc3 | ||
|
|
3291bc7097 | ||
|
|
a7d56504be | ||
|
|
90e5b473d9 | ||
|
|
52834e9966 | ||
|
|
63c60d4cea | ||
|
|
6a2a9d2093 | ||
|
|
1410448808 | ||
|
|
ad8f2222db | ||
|
|
48265e73b1 | ||
|
|
f3139a8aa0 | ||
|
|
8d68e36703 | ||
|
|
d2d93fbe80 | ||
|
|
ed8510468d | ||
|
|
e7680e07c2 | ||
|
|
0f89baa36e | ||
|
|
48ed7690af | ||
|
|
59b94ea650 | ||
|
|
5b992734fb | ||
|
|
3b74b911f8 | ||
|
|
b6570e3c27 | ||
|
|
e1e1664b96 | ||
|
|
cb1913dfc3 | ||
|
|
b813a6f8c5 | ||
|
|
0b662318e1 | ||
|
|
a6ccc1a246 | ||
|
|
bc17e9b389 |
@@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi
|
||||
# no Warning level messages displayed, use"--disable=all --enable=classes
|
||||
# --disable=W"
|
||||
#
|
||||
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,consider-using-f-string,broad-except,no-else-return,unused-argument,global-statement,global-variable-not-assigned,too-many-boolean-expressions,no-else-raise,bare-except,c-extension-no-member
|
||||
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except
|
||||
|
||||
|
||||
[BASIC]
|
||||
|
||||
16
exampleConfig.yaml
Normal file
16
exampleConfig.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
# example config using camelCase keys
|
||||
owner: Bob TBeam
|
||||
|
||||
channelUrl: https://www.meshtastic.org/d/#CgUYAyIBAQ
|
||||
|
||||
location:
|
||||
lat: 35.88888
|
||||
lon: -93.88888
|
||||
alt: 304
|
||||
|
||||
userPrefs:
|
||||
region: 1
|
||||
isAlwaysPowered: 'true'
|
||||
sendOwnerInterval: 2
|
||||
screenOnSecs: 31536000
|
||||
waitBluetoothSecs: 31536000
|
||||
@@ -1,3 +1,4 @@
|
||||
# example configuration file with snake_case keys
|
||||
owner: Bob TBeam
|
||||
|
||||
channel_url: https://www.meshtastic.org/d/#CgUYAyIBAQ
|
||||
|
||||
20
examples/get_hw.py
Normal file
20
examples/get_hw.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/get_hw.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) != 1:
|
||||
print(f"usage: {sys.argv[0]}")
|
||||
print("Print the hardware model for the local node.")
|
||||
sys.exit(3)
|
||||
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
if iface.nodes:
|
||||
for n in iface.nodes.values():
|
||||
if n['num'] == iface.myInfo.my_node_num:
|
||||
print(n['user']['hwModel'])
|
||||
iface.close()
|
||||
20
examples/set_owner.py
Normal file
20
examples/set_owner.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/set_owner.py Bobby 333
|
||||
"""
|
||||
|
||||
import sys
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} long_name [short_name]")
|
||||
sys.exit(3)
|
||||
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
long_name = sys.argv[1]
|
||||
short_name = None
|
||||
if len(sys.argv) > 2:
|
||||
short_name = sys.argv[2]
|
||||
iface.localNode.setOwner(long_name, short_name)
|
||||
iface.close()
|
||||
@@ -72,7 +72,6 @@ from typing import *
|
||||
import serial
|
||||
import timeago
|
||||
import google.protobuf.json_format
|
||||
import pygatt
|
||||
from pubsub import pub
|
||||
from dotmap import DotMap
|
||||
from tabulate import tabulate
|
||||
@@ -83,6 +82,7 @@ from meshtastic import (mesh_pb2, portnums_pb2, apponly_pb2, admin_pb2,
|
||||
environmental_measurement_pb2, remote_hardware_pb2,
|
||||
channel_pb2, radioconfig_pb2, util)
|
||||
|
||||
|
||||
# Note: To follow PEP224, comments should be after the module variable.
|
||||
|
||||
LOCAL_ADDR = "^local"
|
||||
@@ -129,6 +129,7 @@ def _onTextReceive(iface, asDict):
|
||||
#
|
||||
# Usually btw this problem is caused by apps sending binary data but setting the payload type to
|
||||
# text.
|
||||
logging.debug(f'in _onTextReceive() asDict:{asDict}')
|
||||
try:
|
||||
asBytes = asDict["decoded"]["payload"]
|
||||
asDict["decoded"]["text"] = asBytes.decode("utf-8")
|
||||
@@ -139,22 +140,30 @@ def _onTextReceive(iface, asDict):
|
||||
|
||||
def _onPositionReceive(iface, asDict):
|
||||
"""Special auto parsing for received messages"""
|
||||
p = asDict["decoded"]["position"]
|
||||
iface._fixupPosition(p)
|
||||
# update node DB as needed
|
||||
iface._getOrCreateByNum(asDict["from"])["position"] = p
|
||||
logging.debug(f'in _onPositionReceive() asDict:{asDict}')
|
||||
if 'decoded' in asDict:
|
||||
if 'position' in asDict['decoded'] and 'from' in asDict:
|
||||
p = asDict["decoded"]["position"]
|
||||
logging.debug(f'p:{p}')
|
||||
p = iface._fixupPosition(p)
|
||||
logging.debug(f'after fixup p:{p}')
|
||||
# update node DB as needed
|
||||
iface._getOrCreateByNum(asDict["from"])["position"] = p
|
||||
|
||||
|
||||
def _onNodeInfoReceive(iface, asDict):
|
||||
"""Special auto parsing for received messages"""
|
||||
p = asDict["decoded"]["user"]
|
||||
# decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
|
||||
# update node DB as needed
|
||||
n = iface._getOrCreateByNum(asDict["from"])
|
||||
n["user"] = p
|
||||
# We now have a node ID, make sure it is uptodate in that table
|
||||
iface.nodes[p["id"]] = n
|
||||
_receiveInfoUpdate(iface, asDict)
|
||||
logging.debug(f'in _onNodeInfoReceive() asDict:{asDict}')
|
||||
if 'decoded' in asDict:
|
||||
if 'user' in asDict['decoded'] and 'from' in asDict:
|
||||
p = asDict["decoded"]["user"]
|
||||
# decode user protobufs and update nodedb, provide decoded version as "position" in the published msg
|
||||
# update node DB as needed
|
||||
n = iface._getOrCreateByNum(asDict["from"])
|
||||
n["user"] = p
|
||||
# We now have a node ID, make sure it is uptodate in that table
|
||||
iface.nodes[p["id"]] = n
|
||||
_receiveInfoUpdate(iface, asDict)
|
||||
|
||||
|
||||
def _receiveInfoUpdate(iface, asDict):
|
||||
|
||||
@@ -20,7 +20,6 @@ from meshtastic import portnums_pb2, channel_pb2, radioconfig_pb2
|
||||
from meshtastic.globals import Globals
|
||||
from meshtastic.__init__ import BROADCAST_ADDR
|
||||
|
||||
|
||||
def onReceive(packet, interface):
|
||||
"""Callback invoked when a packet arrives"""
|
||||
our_globals = Globals.getInstance()
|
||||
@@ -40,7 +39,7 @@ def onReceive(packet, interface):
|
||||
rxSnr = packet['rxSnr']
|
||||
hopLimit = packet['hopLimit']
|
||||
print(f"message: {msg}")
|
||||
reply = "got msg \'{}\' with rxSnr: {} and hopLimit: {}".format(msg, rxSnr, hopLimit)
|
||||
reply = f"got msg \'{msg}\' with rxSnr: {rxSnr} and hopLimit: {hopLimit}"
|
||||
print("Sending reply: ", reply)
|
||||
interface.sendText(reply)
|
||||
|
||||
@@ -48,7 +47,7 @@ def onReceive(packet, interface):
|
||||
print(f'Warning: There is no field {ex} in the packet.')
|
||||
|
||||
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC):
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
|
||||
"""Callback invoked when we connect/disconnect from a radio"""
|
||||
print(f"Connection changed: {topic.getName()}")
|
||||
|
||||
@@ -56,48 +55,69 @@ def onConnection(interface, topic=pub.AUTO_TOPIC):
|
||||
def getPref(attributes, name):
|
||||
"""Get a channel or preferences value"""
|
||||
|
||||
camel_name = meshtastic.util.snake_to_camel(name)
|
||||
# Note: protobufs has the keys in snake_case, so snake internally
|
||||
snake_name = meshtastic.util.camel_to_snake(name)
|
||||
logging.debug(f'snake_name:{snake_name} camel_name:{camel_name}')
|
||||
logging.debug(f'use camel:{Globals.getInstance().get_camel_case()}')
|
||||
|
||||
objDesc = attributes.DESCRIPTOR
|
||||
field = objDesc.fields_by_name.get(name)
|
||||
field = objDesc.fields_by_name.get(snake_name)
|
||||
if not field:
|
||||
print(f"{attributes.__class__.__name__} does not have an attribute called {name}, so you can not get it.")
|
||||
if Globals.getInstance().get_camel_case():
|
||||
print(f"{attributes.__class__.__name__} does not have an attribute called {camel_name}, so you can not get it.")
|
||||
else:
|
||||
print(f"{attributes.__class__.__name__} does not have an attribute called {snake_name}, so you can not get it.")
|
||||
print(f"Choices in sorted order are:")
|
||||
names = []
|
||||
for f in objDesc.fields:
|
||||
names.append(f'{f.name}')
|
||||
tmp_name = f'{f.name}'
|
||||
if Globals.getInstance().get_camel_case():
|
||||
tmp_name = meshtastic.util.snake_to_camel(tmp_name)
|
||||
names.append(tmp_name)
|
||||
for temp_name in sorted(names):
|
||||
print(f" {temp_name}")
|
||||
return
|
||||
|
||||
# okay - try to read the value
|
||||
try:
|
||||
try:
|
||||
val = getattr(attributes, name)
|
||||
except TypeError:
|
||||
# The getter didn't like our arg type guess try again as a string
|
||||
val = getattr(attributes, name)
|
||||
# read the value
|
||||
val = getattr(attributes, snake_name)
|
||||
|
||||
# succeeded!
|
||||
print(f"{name}: {str(val)}")
|
||||
except Exception as ex:
|
||||
print(f"Can't get {name} due to {ex}")
|
||||
if Globals.getInstance().get_camel_case():
|
||||
print(f"{camel_name}: {str(val)}")
|
||||
logging.debug(f"{camel_name}: {str(val)}")
|
||||
else:
|
||||
print(f"{snake_name}: {str(val)}")
|
||||
logging.debug(f"{snake_name}: {str(val)}")
|
||||
|
||||
|
||||
def setPref(attributes, name, valStr):
|
||||
"""Set a channel or preferences value"""
|
||||
|
||||
snake_name = meshtastic.util.camel_to_snake(name)
|
||||
camel_name = meshtastic.util.snake_to_camel(name)
|
||||
logging.debug(f'snake_name:{snake_name}')
|
||||
logging.debug(f'camel_name:{camel_name}')
|
||||
|
||||
objDesc = attributes.DESCRIPTOR
|
||||
field = objDesc.fields_by_name.get(name)
|
||||
field = objDesc.fields_by_name.get(snake_name)
|
||||
if not field:
|
||||
print(f"{attributes.__class__.__name__} does not have an attribute called {name}, so you can not set it.")
|
||||
if Globals.getInstance().get_camel_case():
|
||||
print(f"{attributes.__class__.__name__} does not have an attribute called {camel_name}, so you can not set it.")
|
||||
else:
|
||||
print(f"{attributes.__class__.__name__} does not have an attribute called {snake_name}, so you can not set it.")
|
||||
print(f"Choices in sorted order are:")
|
||||
names = []
|
||||
for f in objDesc.fields:
|
||||
names.append(f'{f.name}')
|
||||
tmp_name = f'{f.name}'
|
||||
if Globals.getInstance().get_camel_case():
|
||||
tmp_name = meshtastic.util.snake_to_camel(tmp_name)
|
||||
names.append(tmp_name)
|
||||
for temp_name in sorted(names):
|
||||
print(f" {temp_name}")
|
||||
return
|
||||
|
||||
val = meshtastic.util.fromStr(valStr)
|
||||
logging.debug(f'valStr:{valStr} val:{val}')
|
||||
|
||||
enumType = field.enum_type
|
||||
# pylint: disable=C0123
|
||||
@@ -107,27 +127,30 @@ def setPref(attributes, name, valStr):
|
||||
if e:
|
||||
val = e.number
|
||||
else:
|
||||
print(f"{name} does not have an enum called {val}, so you can not set it.")
|
||||
if Globals.getInstance().get_camel_case():
|
||||
print(f"{camel_name} does not have an enum called {val}, so you can not set it.")
|
||||
else:
|
||||
print(f"{snake_name} does not have an enum called {val}, so you can not set it.")
|
||||
print(f"Choices in sorted order are:")
|
||||
names = []
|
||||
for f in enumType.values:
|
||||
names.append(f'{f.name}')
|
||||
tmp_name = f'{f.name}'
|
||||
if Globals.getInstance().get_camel_case():
|
||||
tmp_name = meshtastic.util.snake_to_camel(tmp_name)
|
||||
names.append(name)
|
||||
for temp_name in sorted(names):
|
||||
print(f" {temp_name}")
|
||||
return
|
||||
|
||||
# okay - try to read the value
|
||||
try:
|
||||
try:
|
||||
setattr(attributes, name, val)
|
||||
except TypeError:
|
||||
# The setter didn't like our arg type guess try again as a string
|
||||
setattr(attributes, name, valStr)
|
||||
setattr(attributes, snake_name, val)
|
||||
except TypeError:
|
||||
# The setter didn't like our arg type guess try again as a string
|
||||
setattr(attributes, snake_name, valStr)
|
||||
|
||||
# succeeded!
|
||||
print(f"Set {name} to {valStr}")
|
||||
except Exception as ex:
|
||||
print(f"Can't set {name} due to {ex}")
|
||||
if Globals.getInstance().get_camel_case():
|
||||
print(f"Set {camel_name} to {valStr}")
|
||||
else:
|
||||
print(f"Set {snake_name} to {valStr}")
|
||||
|
||||
|
||||
def onConnected(interface):
|
||||
@@ -192,7 +215,7 @@ def onConnected(interface):
|
||||
|
||||
else:
|
||||
print(f"Setting position fields to {allFields}")
|
||||
setPref(prefs, 'position_flags', ('%d' % allFields))
|
||||
setPref(prefs, 'position_flags', f'{allFields:d}')
|
||||
print("Writing modified preferences to device")
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
@@ -311,6 +334,10 @@ def onConnected(interface):
|
||||
print("Setting channel url to", configuration['channel_url'])
|
||||
interface.getNode(args.dest).setURL(configuration['channel_url'])
|
||||
|
||||
if 'channelUrl' in configuration:
|
||||
print("Setting channel url to", configuration['channelUrl'])
|
||||
interface.getNode(args.dest).setURL(configuration['channelUrl'])
|
||||
|
||||
if 'location' in configuration:
|
||||
alt = 0
|
||||
lat = 0.0
|
||||
@@ -340,6 +367,13 @@ def onConnected(interface):
|
||||
print("Writing modified preferences to device")
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
if 'userPrefs' in configuration:
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
for pref in configuration['userPrefs']:
|
||||
setPref(prefs, pref, str(configuration['userPrefs'][pref]))
|
||||
print("Writing modified preferences to device")
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
if args.export_config:
|
||||
# export the configuration (the opposite of '--configure')
|
||||
closeNow = True
|
||||
@@ -548,7 +582,10 @@ def export_config(interface):
|
||||
if owner:
|
||||
config += f"owner: {owner}\n\n"
|
||||
if channel_url:
|
||||
config += f"channel_url: {channel_url}\n\n"
|
||||
if Globals.getInstance().get_camel_case():
|
||||
config += f"channelUrl: {channel_url}\n\n"
|
||||
else:
|
||||
config += f"channel_url: {channel_url}\n\n"
|
||||
if lat or lon or alt:
|
||||
config += "location:\n"
|
||||
if lat:
|
||||
@@ -561,9 +598,16 @@ def export_config(interface):
|
||||
preferences = f'{interface.localNode.radioConfig.preferences}'
|
||||
prefs = preferences.splitlines()
|
||||
if prefs:
|
||||
config += "user_prefs:\n"
|
||||
if Globals.getInstance().get_camel_case():
|
||||
config += "userPrefs:\n"
|
||||
else:
|
||||
config += "user_prefs:\n"
|
||||
for pref in prefs:
|
||||
config += f" {meshtastic.util.quoteBooleans(pref)}\n"
|
||||
if Globals.getInstance().get_camel_case():
|
||||
# Note: This may not work if the value has '_'
|
||||
config += f" {meshtastic.util.snake_to_camel(meshtastic.util.quoteBooleans(pref))}\n"
|
||||
else:
|
||||
config += f" {meshtastic.util.quoteBooleans(pref)}\n"
|
||||
print(config)
|
||||
return config
|
||||
|
||||
@@ -692,10 +736,12 @@ def initParser():
|
||||
action="store_true")
|
||||
|
||||
parser.add_argument(
|
||||
"--get", help="Get a preferences field. Use an invalid field such as '0' to get a list of all fields.", nargs=1, action='append')
|
||||
"--get", help=("Get a preferences field. Use an invalid field such as '0' to get a list of all fields."
|
||||
" Can use either snake_case or camelCase format. (ex: 'ls_secs' or 'lsSecs')"),
|
||||
nargs=1, action='append')
|
||||
|
||||
parser.add_argument(
|
||||
"--set", help="Set a preferences field", nargs=2, action='append')
|
||||
"--set", help="Set a preferences field. Can use either snake_case or camelCase format. (ex: 'ls_secs' or 'lsSecs')", nargs=2, action='append')
|
||||
|
||||
parser.add_argument(
|
||||
"--seturl", help="Set a channel URL", action="store")
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
"""Bluetooth interface
|
||||
"""
|
||||
import logging
|
||||
import pygatt
|
||||
|
||||
import platform
|
||||
|
||||
from meshtastic.mesh_interface import MeshInterface
|
||||
from meshtastic.util import our_exit
|
||||
|
||||
if platform.system() == 'Linux':
|
||||
# pylint: disable=E0401
|
||||
import pygatt
|
||||
|
||||
|
||||
|
||||
# Our standard BLE characteristics
|
||||
TORADIO_UUID = "f75c76d2-129e-4dad-a1dd-7866124401e7"
|
||||
@@ -16,6 +22,8 @@ class BLEInterface(MeshInterface):
|
||||
"""A not quite ready - FIXME - BLE interface to devices"""
|
||||
|
||||
def __init__(self, address, noProto=False, debugOut=None):
|
||||
if platform.system() != 'Linux':
|
||||
our_exit("Linux is the only platform with experimental BLE support.", 1)
|
||||
self.address = address
|
||||
if not noProto:
|
||||
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
|
||||
@@ -31,7 +39,7 @@ class BLEInterface(MeshInterface):
|
||||
|
||||
self._readFromRadio() # read the initial responses
|
||||
|
||||
def handle_data(handle, data):
|
||||
def handle_data(handle, data): # pylint: disable=W0613
|
||||
self._handleFromRadio(data)
|
||||
|
||||
if self.device:
|
||||
|
||||
@@ -30,6 +30,8 @@ class Globals:
|
||||
self.channel_index = None
|
||||
self.logfile = None
|
||||
self.tunnelInstance = None
|
||||
# TODO: to migrate to camel_case for v1.3 change this value to True
|
||||
self.camel_case = False
|
||||
|
||||
def reset(self):
|
||||
"""Reset all of our globals. If you add a member, add it to this method, too."""
|
||||
@@ -38,6 +40,8 @@ class Globals:
|
||||
self.channel_index = None
|
||||
self.logfile = None
|
||||
self.tunnelInstance = None
|
||||
# TODO: to migrate to camel_case for v1.3 change this value to True
|
||||
self.camel_case = False
|
||||
|
||||
# setters
|
||||
def set_args(self, args):
|
||||
@@ -60,6 +64,10 @@ class Globals:
|
||||
"""Set the tunnelInstance"""
|
||||
self.tunnelInstance = tunnelInstance
|
||||
|
||||
def set_camel_case(self):
|
||||
"""Force using camelCase for things like prefs/set/set"""
|
||||
self.camel_case = True
|
||||
|
||||
# getters
|
||||
def get_args(self):
|
||||
"""Get args"""
|
||||
@@ -80,3 +88,7 @@ class Globals:
|
||||
def get_tunnelInstance(self):
|
||||
"""Get tunnelInstance"""
|
||||
return self.tunnelInstance
|
||||
|
||||
def get_camel_case(self):
|
||||
"""Get whether or not to use camelCase"""
|
||||
return self.camel_case
|
||||
|
||||
@@ -73,7 +73,7 @@ class MeshInterface:
|
||||
logging.error(f'Traceback: {traceback}')
|
||||
self.close()
|
||||
|
||||
def showInfo(self, file=sys.stdout):
|
||||
def showInfo(self, file=sys.stdout): # pylint: disable=W0613
|
||||
"""Show human readable summary about this object"""
|
||||
owner = f"Owner: {self.getLongName()} ({self.getShortName()})"
|
||||
myinfo = ''
|
||||
@@ -100,7 +100,7 @@ class MeshInterface:
|
||||
print(infos)
|
||||
return infos
|
||||
|
||||
def showNodes(self, includeSelf=True, file=sys.stdout):
|
||||
def showNodes(self, includeSelf=True, file=sys.stdout): # pylint: disable=W0613
|
||||
"""Show table summary of nodes in mesh"""
|
||||
def formatFloat(value, precision=2, unit=''):
|
||||
"""Format a float value with precsion."""
|
||||
|
||||
@@ -8,8 +8,6 @@ from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
|
||||
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
|
||||
|
||||
|
||||
|
||||
|
||||
class Node:
|
||||
"""A model of a (local or remote) node in the mesh
|
||||
|
||||
|
||||
@@ -62,8 +62,7 @@ class StreamInterface(MeshInterface):
|
||||
# because we want to ensure it is looking for START1)
|
||||
p = bytearray([START2] * 32)
|
||||
self._writeBytes(p)
|
||||
if not self.noProto:
|
||||
time.sleep(0.1) # wait 100ms to give device time to start running
|
||||
time.sleep(0.1) # wait 100ms to give device time to start running
|
||||
|
||||
self._rxThread.start()
|
||||
|
||||
@@ -90,8 +89,7 @@ class StreamInterface(MeshInterface):
|
||||
self.stream.write(b)
|
||||
self.stream.flush()
|
||||
# we sleep here to give the TBeam a chance to work
|
||||
if not self.noProto:
|
||||
time.sleep(0.1)
|
||||
time.sleep(0.1)
|
||||
|
||||
def _readBytes(self, length):
|
||||
"""Read an array of bytes from our stream"""
|
||||
|
||||
@@ -33,6 +33,12 @@ class TCPInterface(StreamInterface):
|
||||
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto,
|
||||
connectNow=connectNow)
|
||||
|
||||
def _socket_shutdown(self):
|
||||
"""Shutdown the socket.
|
||||
Note: Broke out this line so the exception could be unit tested.
|
||||
"""
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
|
||||
def myConnect(self):
|
||||
"""Connect to socket"""
|
||||
server_address = (self.hostname, self.portNumber)
|
||||
@@ -48,7 +54,7 @@ class TCPInterface(StreamInterface):
|
||||
self._wantExit = True
|
||||
if not self.socket is None:
|
||||
try:
|
||||
self.socket.shutdown(socket.SHUT_RDWR)
|
||||
self._socket_shutdown()
|
||||
except:
|
||||
pass # Ignore errors in shutdown, because we might have a race with the server
|
||||
self.socket.close()
|
||||
|
||||
@@ -63,6 +63,7 @@ def testSend(fromInterface, toInterface, isBroadcast=False, asBinary=False, want
|
||||
Returns:
|
||||
boolean -- True for success
|
||||
"""
|
||||
# pylint: disable=W0603
|
||||
global receivedPackets
|
||||
receivedPackets = []
|
||||
fromNode = fromInterface.myInfo.my_node_num
|
||||
@@ -74,6 +75,7 @@ def testSend(fromInterface, toInterface, isBroadcast=False, asBinary=False, want
|
||||
|
||||
logging.debug(
|
||||
f"Sending test wantAck={wantAck} packet from {fromNode} to {toNode}")
|
||||
# pylint: disable=W0603
|
||||
global sendingInterface
|
||||
sendingInterface = fromInterface
|
||||
if not asBinary:
|
||||
@@ -94,6 +96,7 @@ def runTests(numTests=50, wantAck=False, maxFailures=0):
|
||||
numFail = 0
|
||||
numSuccess = 0
|
||||
for _ in range(numTests):
|
||||
# pylint: disable=W0603
|
||||
global testNumber
|
||||
testNumber = testNumber + 1
|
||||
isBroadcast = True
|
||||
@@ -152,6 +155,7 @@ def testAll(numTests=5):
|
||||
|
||||
pub.subscribe(onConnection, "meshtastic.connection")
|
||||
pub.subscribe(onReceive, "meshtastic.receive")
|
||||
# pylint: disable=W0603
|
||||
global interfaces
|
||||
interfaces = list(map(lambda port: SerialInterface(
|
||||
port, debugOut=openDebugLog(port), connectNow=True), ports))
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
"""Meshtastic unit tests for ble_interface.py"""
|
||||
|
||||
|
||||
from unittest.mock import patch
|
||||
import pytest
|
||||
|
||||
from ..ble_interface import BLEInterface
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_BLEInterface():
|
||||
@patch('platform.system', return_value='Linux')
|
||||
def test_BLEInterface(mock_platform):
|
||||
"""Test that we can instantiate a BLEInterface"""
|
||||
iface = BLEInterface('foo', debugOut=True, noProto=True)
|
||||
iface.close()
|
||||
mock_platform.assert_called()
|
||||
|
||||
61
meshtastic/tests/test_init.py
Normal file
61
meshtastic/tests/test_init.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Meshtastic unit tests for __init__.py"""
|
||||
|
||||
import re
|
||||
import logging
|
||||
|
||||
from unittest.mock import MagicMock
|
||||
import pytest
|
||||
|
||||
from meshtastic.__init__ import _onTextReceive, _onPositionReceive, _onNodeInfoReceive
|
||||
from ..serial_interface import SerialInterface
|
||||
from ..globals import Globals
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_init_onTextReceive_with_exception(caplog):
|
||||
"""Test _onTextReceive"""
|
||||
args = MagicMock()
|
||||
Globals.getInstance().set_args(args)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {}
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
_onTextReceive(iface, packet)
|
||||
assert re.search(r'in _onTextReceive', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Malformatted', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_init_onPositionReceive(caplog):
|
||||
"""Test _onPositionReceive"""
|
||||
args = MagicMock()
|
||||
Globals.getInstance().set_args(args)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
packet = {
|
||||
'from': 'foo',
|
||||
'decoded': {
|
||||
'position': {}
|
||||
}
|
||||
}
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
_onPositionReceive(iface, packet)
|
||||
assert re.search(r'in _onPositionReceive', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_init_onNodeInfoReceive(caplog, iface_with_nodes):
|
||||
"""Test _onNodeInfoReceive"""
|
||||
args = MagicMock()
|
||||
Globals.getInstance().set_args(args)
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
packet = {
|
||||
'from': 'foo',
|
||||
'decoded': {
|
||||
'user': {
|
||||
'id': 'bar',
|
||||
},
|
||||
}
|
||||
}
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
_onNodeInfoReceive(iface, packet)
|
||||
assert re.search(r'in _onNodeInfoReceive', caplog.text, re.MULTILINE)
|
||||
File diff suppressed because it is too large
Load Diff
@@ -15,7 +15,8 @@ from ..util import Timeout
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_MeshInterface(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_MeshInterface(capsys):
|
||||
"""Test that we can instantiate a MeshInterface"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = Node('foo', 'bar')
|
||||
@@ -56,7 +57,8 @@ def test_MeshInterface(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getMyUser(reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getMyUser(iface_with_nodes):
|
||||
"""Test getMyUser()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -66,7 +68,8 @@ def test_getMyUser(reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getLongName(reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getLongName(iface_with_nodes):
|
||||
"""Test getLongName()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -75,7 +78,8 @@ def test_getLongName(reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getShortName(reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getShortName(iface_with_nodes):
|
||||
"""Test getShortName()."""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -84,7 +88,8 @@ def test_getShortName(reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handlePacketFromRadio_no_from(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handlePacketFromRadio_no_from(capsys):
|
||||
"""Test _handlePacketFromRadio with no 'from' in the mesh packet."""
|
||||
iface = MeshInterface(noProto=True)
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
@@ -95,7 +100,8 @@ def test_handlePacketFromRadio_no_from(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handlePacketFromRadio_with_a_portnum(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handlePacketFromRadio_with_a_portnum(caplog):
|
||||
"""Test _handlePacketFromRadio with a portnum
|
||||
Since we have an attribute called 'from', we cannot simply 'set' it.
|
||||
Had to implement a hack just to be able to test some code.
|
||||
@@ -110,7 +116,8 @@ def test_handlePacketFromRadio_with_a_portnum(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handlePacketFromRadio_no_portnum(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handlePacketFromRadio_no_portnum(caplog):
|
||||
"""Test _handlePacketFromRadio without a portnum"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
@@ -121,7 +128,8 @@ def test_handlePacketFromRadio_no_portnum(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getNode_with_local(reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getNode_with_local():
|
||||
"""Test getNode"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = iface.getNode(LOCAL_ADDR)
|
||||
@@ -129,7 +137,8 @@ def test_getNode_with_local(reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getNode_not_local(reset_globals, caplog):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getNode_not_local(caplog):
|
||||
"""Test getNode not local"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = MagicMock(autospec=Node)
|
||||
@@ -141,7 +150,8 @@ def test_getNode_not_local(reset_globals, caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getNode_not_local_timeout(reset_globals, capsys):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getNode_not_local_timeout(capsys):
|
||||
"""Test getNode not local, simulate timeout"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = MagicMock(autospec=Node)
|
||||
@@ -157,7 +167,8 @@ def test_getNode_not_local_timeout(reset_globals, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPosition(reset_globals, caplog):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPosition(caplog):
|
||||
"""Test sendPosition"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -167,7 +178,8 @@ def test_sendPosition(reset_globals, caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_close_with_heartbeatTimer(reset_globals, caplog):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_close_with_heartbeatTimer(caplog):
|
||||
"""Test close() with heartbeatTimer"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = Node('foo', 'bar')
|
||||
@@ -183,7 +195,8 @@ def test_close_with_heartbeatTimer(reset_globals, caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handleFromRadio_empty_payload(reset_globals, caplog):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handleFromRadio_empty_payload(caplog):
|
||||
"""Test _handleFromRadio"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -193,7 +206,8 @@ def test_handleFromRadio_empty_payload(reset_globals, caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handleFromRadio_with_my_info(reset_globals, caplog):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handleFromRadio_with_my_info(caplog):
|
||||
"""Test _handleFromRadio with my_info"""
|
||||
# Note: I captured the '--debug --info' for the bytes below.
|
||||
# It "translates" to this:
|
||||
@@ -218,7 +232,8 @@ def test_handleFromRadio_with_my_info(reset_globals, caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handleFromRadio_with_node_info(reset_globals, caplog, capsys):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handleFromRadio_with_node_info(caplog, capsys):
|
||||
"""Test _handleFromRadio with node_info"""
|
||||
# Note: I captured the '--debug --info' for the bytes below.
|
||||
# It "translates" to this:
|
||||
@@ -254,7 +269,8 @@ def test_handleFromRadio_with_node_info(reset_globals, caplog, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handleFromRadio_with_node_info_tbeam1(reset_globals, caplog, capsys):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handleFromRadio_with_node_info_tbeam1(caplog, capsys):
|
||||
"""Test _handleFromRadio with node_info"""
|
||||
# Note: Captured the '--debug --info' for the bytes below.
|
||||
# pylint: disable=C0301
|
||||
@@ -277,7 +293,8 @@ def test_handleFromRadio_with_node_info_tbeam1(reset_globals, caplog, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(reset_globals, caplog, capsys):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_handleFromRadio_with_node_info_tbeam_with_bad_data(caplog):
|
||||
"""Test _handleFromRadio with node_info with some bad data (issue#172) - ensure we do not throw exception"""
|
||||
# Note: Captured the '--debug --info' for the bytes below.
|
||||
from_radio_bytes = b'"\x17\x08\xdc\x8a\x8a\xae\x02\x12\x08"\x06\x00\x00\x00\x00\x00\x00\x1a\x00=\x00\x00\xb8@'
|
||||
@@ -288,7 +305,8 @@ def test_handleFromRadio_with_node_info_tbeam_with_bad_data(reset_globals, caplo
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_MeshInterface_sendToRadioImpl(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_MeshInterface_sendToRadioImpl(caplog):
|
||||
"""Test _sendToRadioImp()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -298,7 +316,8 @@ def test_MeshInterface_sendToRadioImpl(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_MeshInterface_sendToRadio_no_proto(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_MeshInterface_sendToRadio_no_proto(caplog):
|
||||
"""Test sendToRadio()"""
|
||||
iface = MeshInterface()
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -308,7 +327,8 @@ def test_MeshInterface_sendToRadio_no_proto(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendData_too_long(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendData_too_long(caplog):
|
||||
"""Test when data payload is too big"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
some_large_text = b'This is a long text that will be too long for send text.'
|
||||
@@ -332,7 +352,8 @@ def test_sendData_too_long(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendData_unknown_app(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendData_unknown_app(capsys):
|
||||
"""Test sendData when unknown app"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
@@ -345,7 +366,8 @@ def test_sendData_unknown_app(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPosition_with_a_position(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPosition_with_a_position(caplog):
|
||||
"""Test sendPosition when lat/long/alt"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -356,7 +378,8 @@ def test_sendPosition_with_a_position(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_no_destination(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_no_destination(capsys):
|
||||
"""Test _sendPacket()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
@@ -369,7 +392,8 @@ def test_sendPacket_with_no_destination(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_as_int(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_as_int(caplog):
|
||||
"""Test _sendPacket() with int as a destination"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -379,7 +403,8 @@ def test_sendPacket_with_destination_as_int(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_starting_with_a_bang(caplog):
|
||||
"""Test _sendPacket() with int as a destination"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -389,7 +414,8 @@ def test_sendPacket_with_destination_starting_with_a_bang(caplog, reset_globals)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog):
|
||||
"""Test _sendPacket() with BROADCAST_ADDR as a destination"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -399,7 +425,8 @@ def test_sendPacket_with_destination_as_BROADCAST_ADDR(caplog, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys):
|
||||
"""Test _sendPacket() with LOCAL_ADDR as a destination with no myInfo"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
@@ -413,11 +440,13 @@ def test_sendPacket_with_destination_as_LOCAL_ADDR_no_myInfo(capsys, reset_globa
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog):
|
||||
"""Test _sendPacket() with LOCAL_ADDR as a destination with myInfo"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
myInfo = MagicMock()
|
||||
iface.myInfo = myInfo
|
||||
iface.myInfo.my_node_num = 1
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
iface._sendPacket(meshPacket, destinationId=LOCAL_ADDR)
|
||||
@@ -425,7 +454,8 @@ def test_sendPacket_with_destination_as_LOCAL_ADDR_with_myInfo(caplog, reset_glo
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_is_blank_with_nodes(capsys, iface_with_nodes):
|
||||
"""Test _sendPacket() with '' as a destination with myInfo"""
|
||||
iface = iface_with_nodes
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
@@ -439,7 +469,8 @@ def test_sendPacket_with_destination_is_blank_with_nodes(capsys, reset_globals,
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendPacket_with_destination_is_blank_without_nodes(caplog, iface_with_nodes):
|
||||
"""Test _sendPacket() with '' as a destination with myInfo"""
|
||||
iface = iface_with_nodes
|
||||
iface.nodes = None
|
||||
@@ -450,7 +481,8 @@ def test_sendPacket_with_destination_is_blank_without_nodes(caplog, reset_global
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getMyNodeInfo(reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getMyNodeInfo():
|
||||
"""Test getMyNodeInfo()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = iface.getNode(LOCAL_ADDR)
|
||||
@@ -464,7 +496,8 @@ def test_getMyNodeInfo(reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_generatePacketId(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_generatePacketId(capsys):
|
||||
"""Test _generatePacketId() when no currentPacketId (not connected)"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
# not sure when this condition would ever happen... but we can simulate it
|
||||
@@ -479,7 +512,8 @@ def test_generatePacketId(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_fixupPosition_empty_pos(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_fixupPosition_empty_pos():
|
||||
"""Test _fixupPosition()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
pos = {}
|
||||
@@ -488,7 +522,8 @@ def test_fixupPosition_empty_pos(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_fixupPosition_no_changes_needed(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_fixupPosition_no_changes_needed():
|
||||
"""Test _fixupPosition()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
pos = {"latitude": 101, "longitude": 102}
|
||||
@@ -497,7 +532,8 @@ def test_fixupPosition_no_changes_needed(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_fixupPosition(capsys, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_fixupPosition():
|
||||
"""Test _fixupPosition()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
pos = {"latitudeI": 1010000000, "longitudeI": 1020000000}
|
||||
@@ -509,7 +545,8 @@ def test_fixupPosition(capsys, reset_globals):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_nodeNumToId(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_nodeNumToId(iface_with_nodes):
|
||||
"""Test _nodeNumToId()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -518,7 +555,8 @@ def test_nodeNumToId(capsys, reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_nodeNumToId_not_found(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_nodeNumToId_not_found(iface_with_nodes):
|
||||
"""Test _nodeNumToId()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -527,7 +565,8 @@ def test_nodeNumToId_not_found(capsys, reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_nodeNumToId_to_all(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_nodeNumToId_to_all(iface_with_nodes):
|
||||
"""Test _nodeNumToId()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -536,7 +575,8 @@ def test_nodeNumToId_to_all(capsys, reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getOrCreateByNum_minimal(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getOrCreateByNum_minimal(iface_with_nodes):
|
||||
"""Test _getOrCreateByNum()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -545,7 +585,8 @@ def test_getOrCreateByNum_minimal(capsys, reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getOrCreateByNum_not_found(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getOrCreateByNum_not_found(iface_with_nodes):
|
||||
"""Test _getOrCreateByNum()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -555,7 +596,8 @@ def test_getOrCreateByNum_not_found(capsys, reset_globals, iface_with_nodes):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getOrCreateByNum(capsys, reset_globals, iface_with_nodes):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_getOrCreateByNum(iface_with_nodes):
|
||||
"""Test _getOrCreateByNum()"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -581,7 +623,7 @@ def test_exit_with_exception(caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes):
|
||||
def test_showNodes_exclude_self(capsys, caplog, iface_with_nodes):
|
||||
"""Test that we hit that continue statement"""
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface = iface_with_nodes
|
||||
@@ -592,7 +634,7 @@ def test_showNodes_exclude_self(capsys, caplog, reset_globals, iface_with_nodes)
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
def test_waitForConfig(caplog, capsys):
|
||||
def test_waitForConfig(capsys):
|
||||
"""Test waitForConfig()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
# override how long to wait
|
||||
@@ -606,7 +648,7 @@ def test_waitForConfig(caplog, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_waitConnected_raises_an_exception(caplog, capsys):
|
||||
def test_waitConnected_raises_an_exception(capsys):
|
||||
"""Test waitConnected()"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
@@ -619,7 +661,7 @@ def test_waitConnected_raises_an_exception(caplog, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_waitConnected_isConnected_timeout(caplog, capsys):
|
||||
def test_waitConnected_isConnected_timeout(capsys):
|
||||
"""Test waitConnected()"""
|
||||
with pytest.raises(Exception) as pytest_wrapped_e:
|
||||
iface = MeshInterface()
|
||||
|
||||
@@ -150,7 +150,7 @@ def test_setURL_valid_URL(caplog):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_setURL_valid_URL_but_no_settings(caplog, capsys):
|
||||
def test_setURL_valid_URL_but_no_settings(capsys):
|
||||
"""Test setURL"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
url = "https://www.meshtastic.org/d/#"
|
||||
@@ -430,7 +430,7 @@ def test_deleteChannel_secondary_with_admin_channel_before_testing():
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getChannelByName(capsys):
|
||||
def test_getChannelByName():
|
||||
"""Get a channel by the name."""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
@@ -457,7 +457,7 @@ def test_getChannelByName(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getChannelByName_invalid_name(capsys):
|
||||
def test_getChannelByName_invalid_name():
|
||||
"""Get a channel by the name but one that is not present."""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
@@ -484,7 +484,7 @@ def test_getChannelByName_invalid_name(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getDisabledChannel(capsys):
|
||||
def test_getDisabledChannel():
|
||||
"""Get the first disabled channel."""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
@@ -514,7 +514,7 @@ def test_getDisabledChannel(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getDisabledChannel_where_all_channels_are_used(capsys):
|
||||
def test_getDisabledChannel_where_all_channels_are_used():
|
||||
"""Get the first disabled channel."""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
@@ -538,7 +538,7 @@ def test_getDisabledChannel_where_all_channels_are_used(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getAdminChannelIndex(capsys):
|
||||
def test_getAdminChannelIndex():
|
||||
"""Get the 'admin' channel index."""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
@@ -565,7 +565,7 @@ def test_getAdminChannelIndex(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_getAdminChannelIndex_when_no_admin_named_channel(capsys):
|
||||
def test_getAdminChannelIndex_when_no_admin_named_channel():
|
||||
"""Get the 'admin' channel when there is not one."""
|
||||
anode = Node('foo', 'bar')
|
||||
|
||||
|
||||
@@ -23,6 +23,10 @@ def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, mocked_ope
|
||||
iface.close()
|
||||
mocked_findPorts.assert_called()
|
||||
mocked_serial.assert_called()
|
||||
mocked_open.assert_called()
|
||||
mock_get.assert_called()
|
||||
mock_set.assert_called()
|
||||
mock_sleep.assert_called()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Nodes in mesh', out, re.MULTILINE)
|
||||
assert re.search(r'Preferences', out, re.MULTILINE)
|
||||
|
||||
@@ -19,7 +19,8 @@ def test_StreamInterface():
|
||||
|
||||
# Note: This takes a bit, so moving from unit to slow
|
||||
@pytest.mark.unitslow
|
||||
def test_StreamInterface_with_noProto(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_StreamInterface_with_noProto(caplog):
|
||||
"""Test that we can instantiate a StreamInterface based on nonProto
|
||||
and we can read/write bytes from a mocked stream
|
||||
"""
|
||||
@@ -38,7 +39,8 @@ def test_StreamInterface_with_noProto(caplog, reset_globals):
|
||||
## Tip: If you want to see the print output, run with '-s' flag:
|
||||
## pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
|
||||
@pytest.mark.unitslow
|
||||
def test_sendToRadioImpl(caplog, reset_globals):
|
||||
@pytest.mark.usefixtures("reset_globals")
|
||||
def test_sendToRadioImpl(caplog):
|
||||
"""Test _sendToRadioImpl()"""
|
||||
|
||||
# def add_header(b):
|
||||
|
||||
@@ -28,7 +28,24 @@ def test_TCPInterface(capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_TCPInterface_without_connecting(capsys):
|
||||
def test_TCPInterface_exception():
|
||||
"""Test that we can instantiate a TCPInterface"""
|
||||
|
||||
def throw_an_exception():
|
||||
raise ValueError("Fake exception.")
|
||||
|
||||
with patch('meshtastic.tcp_interface.TCPInterface._socket_shutdown') as mock_shutdown:
|
||||
mock_shutdown.side_effect = throw_an_exception
|
||||
with patch('socket.socket') as mock_socket:
|
||||
iface = TCPInterface(hostname='localhost', noProto=True)
|
||||
iface.myConnect()
|
||||
iface.close()
|
||||
assert mock_socket.called
|
||||
assert mock_shutdown.called
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_TCPInterface_without_connecting():
|
||||
"""Test that we can instantiate a TCPInterface with connectNow as false"""
|
||||
with patch('socket.socket'):
|
||||
iface = TCPInterface(hostname='localhost', noProto=True, connectNow=False)
|
||||
|
||||
@@ -14,7 +14,7 @@ from ..globals import Globals
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('platform.system')
|
||||
def test_Tunnel_on_non_linux_system(mock_platform_system, reset_globals):
|
||||
def test_Tunnel_on_non_linux_system(mock_platform_system):
|
||||
"""Test that we cannot instantiate a Tunnel on a non Linux system"""
|
||||
a_mock = MagicMock()
|
||||
a_mock.return_value = 'notLinux'
|
||||
@@ -29,7 +29,7 @@ def test_Tunnel_on_non_linux_system(mock_platform_system, reset_globals):
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('platform.system')
|
||||
def test_Tunnel_without_interface(mock_platform_system, reset_globals):
|
||||
def test_Tunnel_without_interface(mock_platform_system):
|
||||
"""Test that we can not instantiate a Tunnel without a valid interface"""
|
||||
a_mock = MagicMock()
|
||||
a_mock.return_value = 'Linux'
|
||||
@@ -41,7 +41,7 @@ def test_Tunnel_without_interface(mock_platform_system, reset_globals):
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_Tunnel_with_interface(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_Tunnel_with_interface(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test that we can not instantiate a Tunnel without a valid interface"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -60,7 +60,7 @@ def test_Tunnel_with_interface(mock_platform_system, caplog, reset_globals, ifac
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_onTunnelReceive_from_ourselves(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_onTunnelReceive_from_ourselves(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test onTunnelReceive"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -81,7 +81,7 @@ def test_onTunnelReceive_from_ourselves(mock_platform_system, caplog, reset_glob
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('platform.system')
|
||||
def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test onTunnelReceive"""
|
||||
iface = iface_with_nodes
|
||||
iface.myInfo.my_node_num = 2475227164
|
||||
@@ -101,7 +101,7 @@ def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, reset_g
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_random(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -119,7 +119,7 @@ def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals,
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -137,7 +137,7 @@ def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_glo
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_icmp(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -156,7 +156,7 @@ def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, if
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_udp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_udp(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -175,7 +175,7 @@ def test_shouldFilterPacket_udp(mock_platform_system, caplog, reset_globals, ifa
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -196,7 +196,7 @@ def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_tcp(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_tcp(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -215,7 +215,7 @@ def test_shouldFilterPacket_tcp(mock_platform_system, caplog, reset_globals, ifa
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _shouldFilterPacket()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -236,7 +236,7 @@ def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_ipToNodeId_none(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _ipToNodeId()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
@@ -252,7 +252,7 @@ def test_ipToNodeId_none(mock_platform_system, caplog, reset_globals, iface_with
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@patch('platform.system')
|
||||
def test_ipToNodeId_all(mock_platform_system, caplog, reset_globals, iface_with_nodes):
|
||||
def test_ipToNodeId_all(mock_platform_system, caplog, iface_with_nodes):
|
||||
"""Test _ipToNodeId()"""
|
||||
iface = iface_with_nodes
|
||||
iface.noProto = True
|
||||
|
||||
@@ -10,7 +10,8 @@ from meshtastic.util import (fixme, stripnl, pskToString, our_exit,
|
||||
support_info, genPSK256, fromStr, fromPSK,
|
||||
quoteBooleans, catchAndIgnore,
|
||||
remove_keys_from_dict, Timeout, hexstr,
|
||||
ipstr, readnet_u16, findPorts, convert_mac_addr)
|
||||
ipstr, readnet_u16, findPorts, convert_mac_addr,
|
||||
snake_to_camel, camel_to_snake)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -39,6 +40,7 @@ def test_fromStr():
|
||||
assert fromStr('100.01') == 100.01
|
||||
assert fromStr('123') == 123
|
||||
assert fromStr('abc') == 'abc'
|
||||
assert fromStr('123456789') == 123456789
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
@@ -242,10 +244,31 @@ def test_readnet_u16():
|
||||
def test_findPorts_when_none_found(patch_comports):
|
||||
"""Test findPorts()"""
|
||||
assert not findPorts()
|
||||
patch_comports.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
def test_convert_mac_addr():
|
||||
"""Test convert_mac_addr()"""
|
||||
assert convert_mac_addr('/c0gFyhb') == 'fd:cd:20:17:28:5b'
|
||||
assert convert_mac_addr('fd:cd:20:17:28:5b') == 'fd:cd:20:17:28:5b'
|
||||
assert convert_mac_addr('') == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_snake_to_camel():
|
||||
"""Test snake_to_camel"""
|
||||
assert snake_to_camel('') == ''
|
||||
assert snake_to_camel('foo') == 'foo'
|
||||
assert snake_to_camel('foo_bar') == 'fooBar'
|
||||
assert snake_to_camel('fooBar') == 'fooBar'
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_camel_to_snake():
|
||||
"""Test camel_to_snake"""
|
||||
assert camel_to_snake('') == ''
|
||||
assert camel_to_snake('foo') == 'foo'
|
||||
assert camel_to_snake('Foo') == 'foo'
|
||||
assert camel_to_snake('fooBar') == 'foo_bar'
|
||||
assert camel_to_snake('fooBarBaz') == 'foo_bar_baz'
|
||||
|
||||
@@ -27,7 +27,7 @@ from meshtastic.util import ipstr, readnet_u16
|
||||
from meshtastic.globals import Globals
|
||||
|
||||
|
||||
def onTunnelReceive(packet, interface):
|
||||
def onTunnelReceive(packet, interface): # pylint: disable=W0613
|
||||
"""Callback for received tunneled messages from mesh."""
|
||||
logging.debug(f'in onTunnelReceive()')
|
||||
our_globals = Globals.getInstance()
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
import traceback
|
||||
from queue import Queue
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import base64
|
||||
import time
|
||||
@@ -189,17 +190,16 @@ def support_info():
|
||||
print('or wish to make feature requests, visit:')
|
||||
print('https://github.com/meshtastic/Meshtastic-python/issues')
|
||||
print('When adding an issue, be sure to include the following info:')
|
||||
print(' System: {0}'.format(platform.system()))
|
||||
print(' Platform: {0}'.format(platform.platform()))
|
||||
print(' Release: {0}'.format(platform.uname().release))
|
||||
print(' Machine: {0}'.format(platform.uname().machine))
|
||||
print(' Encoding (stdin): {0}'.format(sys.stdin.encoding))
|
||||
print(' Encoding (stdout): {0}'.format(sys.stdout.encoding))
|
||||
print(f' System: {platform.system()}')
|
||||
print(f' Platform: {platform.platform()}')
|
||||
print(f' Release: {platform.uname().release}')
|
||||
print(f' Machine: {platform.uname().machine}')
|
||||
print(f' Encoding (stdin): {sys.stdin.encoding}')
|
||||
print(f' Encoding (stdout): {sys.stdout.encoding}')
|
||||
the_version = pkg_resources.get_distribution("meshtastic").version
|
||||
print(' meshtastic: v{0}'.format(the_version))
|
||||
print(' Executable: {0}'.format(sys.argv[0]))
|
||||
print(' Python: {0} {1} {2}'.format(platform.python_version(),
|
||||
platform.python_implementation(), platform.python_compiler()))
|
||||
print(f' meshtastic: v{the_version}')
|
||||
print(f' Executable: {sys.argv[0]}')
|
||||
print(f' Python: {platform.python_version()} {platform.python_implementation()} {platform.python_compiler()}')
|
||||
print('')
|
||||
print('Please add the output from the command: meshtastic --info')
|
||||
|
||||
@@ -221,12 +221,12 @@ def remove_keys_from_dict(keys, adict):
|
||||
|
||||
def hexstr(barray):
|
||||
"""Print a string of hex digits"""
|
||||
return ":".join('{:02x}'.format(x) for x in barray)
|
||||
return ":".join(f'{x:02x}' for x in barray)
|
||||
|
||||
|
||||
def ipstr(barray):
|
||||
"""Print a string of ip digits"""
|
||||
return ".".join('{}'.format(x) for x in barray)
|
||||
return ".".join(f'{x}' for x in barray)
|
||||
|
||||
|
||||
def readnet_u16(p, offset):
|
||||
@@ -239,5 +239,21 @@ def convert_mac_addr(val):
|
||||
val - base64 encoded value (ex: '/c0gFyhb'))
|
||||
returns: a string formatted like a mac address (ex: 'fd:cd:20:17:28:5b')
|
||||
"""
|
||||
val_as_bytes = base64.b64decode(val)
|
||||
return hexstr(val_as_bytes)
|
||||
if not re.match("[0-9a-f]{2}([-:]?)[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", val):
|
||||
val_as_bytes = base64.b64decode(val)
|
||||
return hexstr(val_as_bytes)
|
||||
return val
|
||||
|
||||
|
||||
def snake_to_camel(a_string):
|
||||
"""convert snake_case to camelCase"""
|
||||
# split underscore using split
|
||||
temp = a_string.split('_')
|
||||
# joining result
|
||||
result = temp[0] + ''.join(ele.title() for ele in temp[1:])
|
||||
return result
|
||||
|
||||
|
||||
def camel_to_snake(a_string):
|
||||
"""convert camelCase to snake_case"""
|
||||
return ''.join(['_'+i.lower() if i.isupper() else i for i in a_string]).lstrip('_')
|
||||
|
||||
2
proto
2
proto
Submodule proto updated: 18fc4cdb52...d7b2791b7c
@@ -2,6 +2,9 @@
|
||||
|
||||
addopts = -m "not int and not smoke1 and not smoke2 and not smokewifi and not examples and not smokevirt"
|
||||
|
||||
filterwarnings =
|
||||
ignore::DeprecationWarning
|
||||
|
||||
markers =
|
||||
unit: marks tests as unit tests
|
||||
unitslow: marks slow unit tests
|
||||
|
||||
@@ -4,7 +4,6 @@ protobuf
|
||||
dotmap
|
||||
pexpect
|
||||
pyqrcode
|
||||
pygatt
|
||||
tabulate
|
||||
timeago
|
||||
webencodings
|
||||
@@ -18,3 +17,4 @@ pyyaml
|
||||
pytap2
|
||||
pdoc3
|
||||
pypubsub
|
||||
pygatt; platform_system == "Linux"
|
||||
|
||||
5
setup.py
5
setup.py
@@ -12,7 +12,7 @@ with open("README.md", "r") as fh:
|
||||
# This call to setup() does all the work
|
||||
setup(
|
||||
name="meshtastic",
|
||||
version="1.2.53",
|
||||
version="1.2.56",
|
||||
description="Python API & client shell for talking to Meshtastic devices",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
@@ -33,7 +33,8 @@ setup(
|
||||
include_package_data=True,
|
||||
install_requires=["pyserial>=3.4", "protobuf>=3.13.0",
|
||||
"pypubsub>=4.0.3", "dotmap>=1.3.14", "pexpect>=4.6.0", "pyqrcode>=1.2.1",
|
||||
"pygatt>=4.0.5", "tabulate>=0.8.9", "timeago>=1.0.15", "pyyaml"],
|
||||
"tabulate>=0.8.9", "timeago>=1.0.15", "pyyaml",
|
||||
"pygatt>=4.0.5 ; platform_system=='Linux'"],
|
||||
extras_require={
|
||||
'tunnel': ["pytap2>=2.0.0"]
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user