mirror of
https://github.com/meshtastic/python.git
synced 2026-01-02 21:07:55 -05:00
improve the error messages; change from exceptions where it makes sense
This commit is contained in:
@@ -17,7 +17,7 @@ from .tcp_interface import TCPInterface
|
||||
from .ble_interface import BLEInterface
|
||||
from . import test, remote_hardware
|
||||
from . import portnums_pb2, channel_pb2, mesh_pb2, radioconfig_pb2
|
||||
from .util import support_info
|
||||
from .util import support_info, our_exit
|
||||
|
||||
"""We only import the tunnel code if we are on a platform that can run it"""
|
||||
have_tunnel = platform.system() == 'Linux'
|
||||
@@ -410,7 +410,7 @@ def onConnected(interface):
|
||||
closeNow = True
|
||||
n = getNode()
|
||||
if len(args.ch_add) > 10:
|
||||
raise Exception("Channel name must be shorter. Channel not added.")
|
||||
our_exit("Warning: Channel name must be shorter. Channel not added.")
|
||||
ch = n.getChannelByName(args.ch_add)
|
||||
if ch:
|
||||
logging.error(
|
||||
@@ -418,7 +418,7 @@ def onConnected(interface):
|
||||
else:
|
||||
ch = n.getDisabledChannel()
|
||||
if not ch:
|
||||
raise Exception("No free channels were found")
|
||||
our_exit("Warning: No free channels were found")
|
||||
chs = channel_pb2.ChannelSettings()
|
||||
chs.psk = genPSK256()
|
||||
chs.name = args.ch_add
|
||||
@@ -442,8 +442,7 @@ def onConnected(interface):
|
||||
|
||||
if args.ch_longslow or args.ch_longfast or args.ch_mediumslow or args.ch_mediumfast or args.ch_shortslow or args.ch_shortfast:
|
||||
if channelIndex != 0:
|
||||
raise Exception(
|
||||
"standard channel settings can only be applied to the PRIMARY channel")
|
||||
our_exit("Warning: Standard channel settings can only be applied to the PRIMARY channel")
|
||||
|
||||
enable = True # force enable
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ from google.protobuf.json_format import MessageToJson
|
||||
|
||||
|
||||
from . import portnums_pb2, mesh_pb2
|
||||
from .util import stripnl, Timeout
|
||||
from .util import stripnl, Timeout, our_exit
|
||||
from .node import Node
|
||||
from .__init__ import LOCAL_ADDR, BROADCAST_NUM, BROADCAST_ADDR, ResponseHandler, publishingThread, OUR_APP_VERSION, protocols
|
||||
|
||||
@@ -150,7 +150,7 @@ class MeshInterface:
|
||||
n = Node(self, nodeId)
|
||||
n.requestConfig()
|
||||
if not n.waitForConfig():
|
||||
raise Exception("Timed out waiting for node config")
|
||||
our_exit("Error: Timed out waiting for node config")
|
||||
return n
|
||||
|
||||
def sendText(self, text: AnyStr,
|
||||
@@ -205,10 +205,10 @@ class MeshInterface:
|
||||
data = data.SerializeToString()
|
||||
|
||||
if len(data) > mesh_pb2.Constants.DATA_PAYLOAD_LEN:
|
||||
raise Exception("Data payload too big")
|
||||
Exception("Data payload too big")
|
||||
|
||||
if portNum == portnums_pb2.PortNum.UNKNOWN_APP: # we are now more strict wrt port numbers
|
||||
raise Exception("A non-zero port number must be specified")
|
||||
our_exit("Warning: A non-zero port number must be specified")
|
||||
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
meshPacket.channel = channelIndex
|
||||
@@ -272,7 +272,7 @@ class MeshInterface:
|
||||
toRadio = mesh_pb2.ToRadio()
|
||||
|
||||
if destinationId is None:
|
||||
raise Exception("destinationId must not be None")
|
||||
our_exit("Warning: destinationId must not be None")
|
||||
elif isinstance(destinationId, int):
|
||||
nodeNum = destinationId
|
||||
elif destinationId == BROADCAST_ADDR:
|
||||
@@ -285,7 +285,7 @@ class MeshInterface:
|
||||
else:
|
||||
node = self.nodes.get(destinationId)
|
||||
if not node:
|
||||
raise Exception(f"NodeId {destinationId} not found in DB")
|
||||
our_exit(f"Warning: NodeId {destinationId} not found in DB")
|
||||
nodeNum = node['num']
|
||||
|
||||
meshPacket.to = nodeNum
|
||||
|
||||
@@ -5,7 +5,7 @@ import logging
|
||||
import base64
|
||||
from google.protobuf.json_format import MessageToJson
|
||||
from . import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
|
||||
from .util import pskToString, stripnl, Timeout
|
||||
from .util import pskToString, stripnl, Timeout, our_exit
|
||||
|
||||
|
||||
class Node:
|
||||
@@ -61,7 +61,7 @@ class Node:
|
||||
def writeConfig(self):
|
||||
"""Write the current (edited) radioConfig to the device"""
|
||||
if self.radioConfig is None:
|
||||
raise Exception("No RadioConfig has been read")
|
||||
our_exit("Error: No RadioConfig has been read")
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.set_radio.CopyFrom(self.radioConfig)
|
||||
@@ -82,7 +82,7 @@ class Node:
|
||||
"""Delete the specifed channelIndex and shift other channels up"""
|
||||
ch = self.channels[channelIndex]
|
||||
if ch.role != channel_pb2.Channel.Role.SECONDARY:
|
||||
raise Exception("Only SECONDARY channels can be deleted")
|
||||
our_exit("Warning: Only SECONDARY channels can be deleted")
|
||||
|
||||
# we are careful here because if we move the "admin" channel the channelIndex we need to use
|
||||
# for sending admin channels will also change
|
||||
@@ -172,7 +172,7 @@ class Node:
|
||||
def setURL(self, url):
|
||||
"""Set mesh network URL"""
|
||||
if self.radioConfig is None:
|
||||
raise Exception("No RadioConfig has been read")
|
||||
our_exit("Warning: No RadioConfig has been read")
|
||||
|
||||
# URLs are of the form https://www.meshtastic.org/d/#{base64_channel_set}
|
||||
# Split on '/#' to find the base64 encoded channel settings
|
||||
@@ -191,7 +191,7 @@ class Node:
|
||||
channelSet.ParseFromString(decodedURL)
|
||||
|
||||
if len(channelSet.settings) == 0:
|
||||
raise Exception("There were no settings.")
|
||||
our_exit("Warning: There were no settings.")
|
||||
|
||||
i = 0
|
||||
for chs in channelSet.settings:
|
||||
|
||||
@@ -7,7 +7,7 @@ import stat
|
||||
import serial
|
||||
|
||||
from .stream_interface import StreamInterface
|
||||
from .util import findPorts
|
||||
from .util import findPorts, our_exit
|
||||
|
||||
class SerialInterface(StreamInterface):
|
||||
"""Interface class for meshtastic devices over a serial link"""
|
||||
@@ -23,11 +23,13 @@ class SerialInterface(StreamInterface):
|
||||
|
||||
if devPath is None:
|
||||
ports = findPorts()
|
||||
logging.debug(f"ports:{ports}")
|
||||
if len(ports) == 0:
|
||||
raise Exception("No Meshtastic devices detected")
|
||||
our_exit("Warning: No Meshtastic devices detected.")
|
||||
elif len(ports) > 1:
|
||||
raise Exception(
|
||||
f"Multiple ports detected, you must specify a device, such as {ports[0]}")
|
||||
message = "Warning: Multiple serial ports were detected so one serial port must be specified with the '--port'.\n"
|
||||
message += f" Ports detected:{ports}"
|
||||
our_exit(message)
|
||||
else:
|
||||
devPath = ports[0]
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import pytest
|
||||
|
||||
from meshtastic.util import pskToString
|
||||
from meshtastic.util import pskToString, our_exit
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_pskToString_empty_string():
|
||||
@@ -32,3 +32,21 @@ def test_pskToString_one_byte_non_zero_value():
|
||||
def test_pskToString_many_bytes():
|
||||
"""Test pskToString many bytes"""
|
||||
assert pskToString(bytes([0x02, 0x01])) == 'secret'
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_our_exit_zero_return_value():
|
||||
"""Test our_exit with a zero return value"""
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
our_exit("Warning: Some message", 0)
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 0
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_our_exit_non_zero_return_value():
|
||||
"""Test our_exit with a non-zero return value"""
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
our_exit("Error: Some message", 1)
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
|
||||
@@ -114,6 +114,15 @@ class DeferredExecution():
|
||||
f"Unexpected error in deferred execution {sys.exc_info()[0]}")
|
||||
print(traceback.format_exc())
|
||||
|
||||
|
||||
def our_exit(message, return_value = 1):
|
||||
"""Print the message and return a value.
|
||||
return_value defaults to 1 (non-successful)
|
||||
"""
|
||||
print(message)
|
||||
sys.exit(return_value)
|
||||
|
||||
|
||||
def support_info():
|
||||
"""Print out info that is helping in support of the cli."""
|
||||
print('If having issues with meshtastic cli or python library')
|
||||
|
||||
Reference in New Issue
Block a user