test MeshInterface

This commit is contained in:
Mike Kinney
2021-12-06 21:01:21 -08:00
parent 34553d2a25
commit cc9b6cd7b1
3 changed files with 38 additions and 17 deletions

View File

@@ -171,11 +171,14 @@ class MeshInterface:
def showInfo(self, file=sys.stdout):
"""Show human readable summary about this object"""
owner = f"Owner: {self.getLongName()} ({self.getShortName()})"
myinfo = f"\nMy info: {stripnl(MessageToJson(self.myInfo))}"
myinfo = ''
if self.myInfo:
myinfo = f"\nMy info: {stripnl(MessageToJson(self.myInfo))}"
mesh = "\nNodes in mesh:"
nodes = ""
for n in self.nodes.values():
nodes = nodes + f" {stripnl(n)}"
if self.nodes:
for n in self.nodes.values():
nodes = nodes + f" {stripnl(n)}"
infos = owner + myinfo + mesh + nodes
print(infos)
return infos
@@ -500,7 +503,7 @@ class MeshInterface:
def _sendToRadio(self, toRadio):
"""Send a ToRadio protobuf to the device"""
if self.noProto:
logging.warn(
logging.warning(
f"Not sending packet because protocol use is disabled by noProto")
else:
#logging.debug(f"Sending toRadio: {stripnl(toRadio)}")
@@ -643,11 +646,11 @@ class MeshInterface:
try:
asDict["fromId"] = self._nodeNumToId(asDict["from"])
except Exception as ex:
logging.warn(f"Not populating fromId {ex}")
logging.warning(f"Not populating fromId {ex}")
try:
asDict["toId"] = self._nodeNumToId(asDict["to"])
except Exception as ex:
logging.warn(f"Not populating toId {ex}")
logging.warning(f"Not populating toId {ex}")
# We could provide our objects as DotMaps - which work with . notation or as dictionaries
# asObj = DotMap(asDict)
@@ -893,7 +896,7 @@ class StreamInterface(MeshInterface):
pass
except serial.SerialException as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown
logging.warn(
logging.warning(
f"Meshtastic serial port disconnected, disconnecting... {ex}")
except OSError as ex:
if not self._wantExit: # We might intentionally get an exception during shutdown

View File

@@ -97,11 +97,11 @@ class Node:
def showChannels(self):
"""Show human readable description of our channels"""
print("Channels:")
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
cStr = stripnl(MessageToJson(c.settings))
print(
f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}")
if self.channels:
for c in self.channels:
if c.role != channel_pb2.Channel.Role.DISABLED:
cStr = stripnl(MessageToJson(c.settings))
print(f" {channel_pb2.Channel.Role.Name(c.role)} psk={pskToString(c.settings.psk)} {cStr}")
publicURL = self.getURL(includeAll=False)
adminURL = self.getURL(includeAll=True)
print(f"\nPrimary channel URL: {publicURL}")
@@ -110,8 +110,10 @@ class Node:
def showInfo(self):
"""Show human readable description of our node"""
print(
f"Preferences: {stripnl(MessageToJson(self.radioConfig.preferences))}\n")
prefs = ""
if self.radioConfig and self.radioConfig.preferences:
prefs = stripnl(MessageToJson(self.radioConfig.preferences))
print(f"Preferences: {prefs}\n")
self.showChannels()
def requestConfig(self):
@@ -151,18 +153,23 @@ class Node:
def deleteChannel(self, channelIndex):
"""Delete the specifed channelIndex and shift other channels up"""
ch = self.channels[channelIndex]
print('ch:', ch, ' channelIndex:', channelIndex)
if ch.role != channel_pb2.Channel.Role.SECONDARY:
raise Exception("Only SECONDARY channels can be deleted")
# we are careful here because if we move the "admin" channel the channelIndex we need to use
# for sending admin channels will also change
adminIndex = self.iface.localNode._getAdminChannelIndex()
print('adminIndex:', adminIndex)
self.channels.pop(channelIndex)
print('channelIndex:', channelIndex)
self._fixupChannels() # expand back to 8 channels
index = channelIndex
print('max_channels:', self.iface.myInfo.max_channels)
while index < self.iface.myInfo.max_channels:
print('index:', index)
self.writeChannel(index, adminIndex=adminIndex)
index += 1
@@ -231,9 +238,10 @@ class Node:
"""
# Only keep the primary/secondary channels, assume primary is first
channelSet = apponly_pb2.ChannelSet()
for c in self.channels:
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
channelSet.settings.append(c.settings)
if self.channels:
for c in self.channels:
if c.role == channel_pb2.Channel.Role.PRIMARY or (includeAll and c.role == channel_pb2.Channel.Role.SECONDARY):
channelSet.settings.append(c.settings)
bytes = channelSet.SerializeToString()
s = base64.urlsafe_b64encode(bytes).decode('ascii')
return f"https://www.meshtastic.org/d/#{s}".replace("=", "")

View File

@@ -6,6 +6,7 @@ import platform
import pytest
from meshtastic.node import pskToString
from meshtastic.__init__ import MeshInterface
@pytest.mark.unit
def test_pskToString_empty_string():
@@ -35,3 +36,12 @@ def test_pskToString_one_byte_non_zero_value():
def test_pskToString_many_bytes():
"""Test pskToString many bytes"""
assert pskToString(bytes([0x02, 0x01])) == 'secret'
@pytest.mark.unit
def test_MeshInterface():
"""Test that we instantiate a MeshInterface"""
iface = MeshInterface(noProto=True)
iface.showInfo()
iface.localNode.showInfo()
iface.close()