mirror of
https://github.com/meshtastic/python.git
synced 2026-01-02 04:47:54 -05:00
Merge pull request #218 from mkinney/fix_remote_admin_message
refactor code to only call local node when necessary; fix tests
This commit is contained in:
@@ -18,6 +18,7 @@ from meshtastic import remote_hardware
|
||||
from meshtastic.ble_interface import BLEInterface
|
||||
from meshtastic import portnums_pb2, channel_pb2, radioconfig_pb2
|
||||
from meshtastic.globals import Globals
|
||||
from meshtastic.__init__ import BROADCAST_ADDR
|
||||
|
||||
|
||||
def onReceive(packet, interface):
|
||||
@@ -140,14 +141,6 @@ def onConnected(interface):
|
||||
if not args.export_config:
|
||||
print("Connected to radio")
|
||||
|
||||
def getNode():
|
||||
"""This operation could be expensive, so we try to cache the results"""
|
||||
targetNode = our_globals.get_target_node()
|
||||
if not targetNode:
|
||||
targetNode = interface.getNode(args.destOrLocal)
|
||||
our_globals.set_target_node(targetNode)
|
||||
return targetNode
|
||||
|
||||
if args.setlat or args.setlon or args.setalt:
|
||||
closeNow = True
|
||||
|
||||
@@ -179,12 +172,12 @@ def onConnected(interface):
|
||||
if args.set_owner:
|
||||
closeNow = True
|
||||
print(f"Setting device owner to {args.set_owner}")
|
||||
getNode().setOwner(args.set_owner)
|
||||
interface.getNode(args.dest).setOwner(args.set_owner)
|
||||
|
||||
if args.pos_fields:
|
||||
# If --pos-fields invoked with args, set position fields
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
allFields = 0
|
||||
|
||||
try:
|
||||
@@ -201,12 +194,12 @@ def onConnected(interface):
|
||||
print(f"Setting position fields to {allFields}")
|
||||
setPref(prefs, 'position_flags', ('%d' % allFields))
|
||||
print("Writing modified preferences to device")
|
||||
getNode().writeConfig()
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
elif args.pos_fields is not None:
|
||||
# If --pos-fields invoked without args, read and display current value
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
|
||||
fieldNames = []
|
||||
for bit in radioconfig_pb2.PositionFlags.values():
|
||||
@@ -225,86 +218,85 @@ def onConnected(interface):
|
||||
print(sorted(meshtastic.mesh_pb2.Team.keys()))
|
||||
else:
|
||||
print(f"Setting team to {meshtastic.mesh_pb2.Team.Name(v_team)}")
|
||||
getNode().setOwner(team=v_team)
|
||||
interface.getNode(args.dest).setOwner(team=v_team)
|
||||
|
||||
if args.set_ham:
|
||||
closeNow = True
|
||||
print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
|
||||
getNode().setOwner(args.set_ham, is_licensed=True)
|
||||
interface.getNode(args.dest).setOwner(args.set_ham, is_licensed=True)
|
||||
# Must turn off encryption on primary channel
|
||||
getNode().turnOffEncryptionOnPrimaryChannel()
|
||||
interface.getNode(args.dest).turnOffEncryptionOnPrimaryChannel()
|
||||
|
||||
if args.reboot:
|
||||
closeNow = True
|
||||
getNode().reboot()
|
||||
interface.getNode(args.dest).reboot()
|
||||
|
||||
if args.sendtext:
|
||||
closeNow = True
|
||||
channelIndex = 0
|
||||
if args.ch_index is not None:
|
||||
channelIndex = int(args.ch_index)
|
||||
logging.debug(f'channelIndex:{channelIndex}')
|
||||
logging.debug(f'interface.localNode:{interface.localNode}')
|
||||
logging.debug(f'interface.localNode.channels:{interface.localNode.channels}')
|
||||
our_globals.set_target_node(interface.localNode)
|
||||
ch = getNode().getChannelByChannelIndex(channelIndex)
|
||||
ch = interface.localNode.getChannelByChannelIndex(channelIndex)
|
||||
logging.debug(f'ch:{ch}')
|
||||
if ch and ch.role != channel_pb2.Channel.Role.DISABLED:
|
||||
print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}")
|
||||
interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex)
|
||||
print(f"Sending text message {args.sendtext} to {args.dest} on channelIndex:{channelIndex}")
|
||||
interface.sendText(args.sendtext, args.dest, wantAck=True, channelIndex=channelIndex)
|
||||
else:
|
||||
meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.")
|
||||
|
||||
if args.sendping:
|
||||
payload = str.encode("test string")
|
||||
print(f"Sending ping message to {args.destOrAll}")
|
||||
interface.sendData(payload, args.destOrAll, portNum=portnums_pb2.PortNum.REPLY_APP,
|
||||
print(f"Sending ping message to {args.dest}")
|
||||
interface.sendData(payload, args.dest, portNum=portnums_pb2.PortNum.REPLY_APP,
|
||||
wantAck=True, wantResponse=True)
|
||||
|
||||
if args.gpio_wrb or args.gpio_rd or args.gpio_watch:
|
||||
rhc = remote_hardware.RemoteHardwareClient(interface)
|
||||
if args.dest == BROADCAST_ADDR:
|
||||
meshtastic.util.our_exit("Warning: Must use a destination node ID.")
|
||||
else:
|
||||
rhc = remote_hardware.RemoteHardwareClient(interface)
|
||||
|
||||
if args.gpio_wrb:
|
||||
bitmask = 0
|
||||
bitval = 0
|
||||
for wrpair in (args.gpio_wrb or []):
|
||||
bitmask |= 1 << int(wrpair[0])
|
||||
bitval |= int(wrpair[1]) << int(wrpair[0])
|
||||
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
|
||||
rhc.writeGPIOs(args.dest, bitmask, bitval)
|
||||
closeNow = True
|
||||
if args.gpio_wrb:
|
||||
bitmask = 0
|
||||
bitval = 0
|
||||
for wrpair in (args.gpio_wrb or []):
|
||||
bitmask |= 1 << int(wrpair[0])
|
||||
bitval |= int(wrpair[1]) << int(wrpair[0])
|
||||
print(f"Writing GPIO mask 0x{bitmask:x} with value 0x{bitval:x} to {args.dest}")
|
||||
rhc.writeGPIOs(args.dest, bitmask, bitval)
|
||||
closeNow = True
|
||||
|
||||
if args.gpio_rd:
|
||||
bitmask = int(args.gpio_rd, 16)
|
||||
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||
interface.mask = bitmask
|
||||
rhc.readGPIOs(args.dest, bitmask, None)
|
||||
if not interface.noProto:
|
||||
# wait up to X seconds for a response
|
||||
for _ in range(10):
|
||||
if args.gpio_rd:
|
||||
bitmask = int(args.gpio_rd, 16)
|
||||
print(f"Reading GPIO mask 0x{bitmask:x} from {args.dest}")
|
||||
interface.mask = bitmask
|
||||
rhc.readGPIOs(args.dest, bitmask, None)
|
||||
if not interface.noProto:
|
||||
# wait up to X seconds for a response
|
||||
for _ in range(10):
|
||||
time.sleep(1)
|
||||
if interface.gotResponse:
|
||||
break
|
||||
logging.debug(f'end of gpio_rd')
|
||||
|
||||
if args.gpio_watch:
|
||||
bitmask = int(args.gpio_watch, 16)
|
||||
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
|
||||
while True:
|
||||
rhc.watchGPIOs(args.dest, bitmask)
|
||||
time.sleep(1)
|
||||
if interface.gotResponse:
|
||||
break
|
||||
logging.debug(f'end of gpio_rd')
|
||||
|
||||
if args.gpio_watch:
|
||||
bitmask = int(args.gpio_watch, 16)
|
||||
print(f"Watching GPIO mask 0x{bitmask:x} from {args.dest}. Press ctrl-c to exit")
|
||||
while True:
|
||||
rhc.watchGPIOs(args.dest, bitmask)
|
||||
time.sleep(1)
|
||||
|
||||
# handle settings
|
||||
if args.set:
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
|
||||
# Handle the int/float/bool arguments
|
||||
for pref in args.set:
|
||||
setPref(prefs, pref[0], pref[1])
|
||||
|
||||
print("Writing modified preferences to device")
|
||||
getNode().writeConfig()
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
if args.configure:
|
||||
with open(args.configure[0], encoding='utf8') as file:
|
||||
@@ -313,11 +305,11 @@ def onConnected(interface):
|
||||
|
||||
if 'owner' in configuration:
|
||||
print(f"Setting device owner to {configuration['owner']}")
|
||||
getNode().setOwner(configuration['owner'])
|
||||
interface.getNode(args.dest).setOwner(configuration['owner'])
|
||||
|
||||
if 'channel_url' in configuration:
|
||||
print("Setting channel url to", configuration['channel_url'])
|
||||
getNode().setURL(configuration['channel_url'])
|
||||
interface.getNode(args.dest).setURL(configuration['channel_url'])
|
||||
|
||||
if 'location' in configuration:
|
||||
alt = 0
|
||||
@@ -342,11 +334,11 @@ def onConnected(interface):
|
||||
interface.localNode.writeConfig()
|
||||
|
||||
if 'user_prefs' in configuration:
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
for pref in configuration['user_prefs']:
|
||||
setPref(prefs, pref, str(configuration['user_prefs'][pref]))
|
||||
print("Writing modified preferences to device")
|
||||
getNode().writeConfig()
|
||||
interface.getNode(args.dest).writeConfig()
|
||||
|
||||
if args.export_config:
|
||||
# export the configuration (the opposite of '--configure')
|
||||
@@ -355,7 +347,7 @@ def onConnected(interface):
|
||||
|
||||
if args.seturl:
|
||||
closeNow = True
|
||||
getNode().setURL(args.seturl)
|
||||
interface.getNode(args.dest).setURL(args.seturl)
|
||||
|
||||
# handle changing channels
|
||||
|
||||
@@ -363,7 +355,7 @@ def onConnected(interface):
|
||||
closeNow = True
|
||||
if len(args.ch_add) > 10:
|
||||
meshtastic.util.our_exit("Warning: Channel name must be shorter. Channel not added.")
|
||||
n = getNode()
|
||||
n = interface.getNode(args.dest)
|
||||
ch = n.getChannelByName(args.ch_add)
|
||||
if ch:
|
||||
meshtastic.util.our_exit(f"Warning: This node already has a '{args.ch_add}' channel. No changes were made.")
|
||||
@@ -391,7 +383,7 @@ def onConnected(interface):
|
||||
meshtastic.util.our_exit("Warning: Cannot delete primary channel.", 1)
|
||||
else:
|
||||
print(f"Deleting channel {channelIndex}")
|
||||
ch = getNode().deleteChannel(channelIndex)
|
||||
ch = interface.getNode(args.dest).deleteChannel(channelIndex)
|
||||
|
||||
ch_changes = [args.ch_longslow, args.ch_longfast,
|
||||
args.ch_mediumslow, args.ch_mediumfast,
|
||||
@@ -407,7 +399,7 @@ def onConnected(interface):
|
||||
channelIndex = 0
|
||||
else:
|
||||
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
|
||||
ch = getNode().channels[channelIndex]
|
||||
ch = interface.getNode(args.dest).channels[channelIndex]
|
||||
|
||||
if any_primary_channel_changes or args.ch_enable or args.ch_disable:
|
||||
|
||||
@@ -468,21 +460,22 @@ def onConnected(interface):
|
||||
ch.role = channel_pb2.Channel.Role.DISABLED
|
||||
|
||||
print(f"Writing modified channels to device")
|
||||
getNode().writeChannel(channelIndex)
|
||||
interface.getNode(args.dest).writeChannel(channelIndex)
|
||||
|
||||
if args.info:
|
||||
print("")
|
||||
if not args.dest: # If we aren't trying to talk to our local node, don't show it
|
||||
# If we aren't trying to talk to our local node, don't show it
|
||||
if args.dest == BROADCAST_ADDR:
|
||||
interface.showInfo()
|
||||
|
||||
print("")
|
||||
getNode().showInfo()
|
||||
interface.getNode(args.dest).showInfo()
|
||||
closeNow = True # FIXME, for now we leave the link up while talking to remote nodes
|
||||
print("")
|
||||
|
||||
if args.get:
|
||||
closeNow = True
|
||||
prefs = getNode().radioConfig.preferences
|
||||
prefs = interface.getNode(args.dest).radioConfig.preferences
|
||||
|
||||
# Handle the int/float/bool arguments
|
||||
for pref in args.get:
|
||||
@@ -596,13 +589,8 @@ def common():
|
||||
channelIndex = int(args.ch_index)
|
||||
our_globals.set_channel_index(channelIndex)
|
||||
|
||||
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
|
||||
if not args.dest:
|
||||
args.destOrAll = "^all"
|
||||
args.destOrLocal = "^local"
|
||||
else:
|
||||
args.destOrAll = args.dest
|
||||
args.destOrLocal = args.dest # FIXME, temp hack for debugging remove
|
||||
args.dest = BROADCAST_ADDR
|
||||
|
||||
if not args.seriallog:
|
||||
if args.noproto:
|
||||
|
||||
@@ -27,7 +27,6 @@ class Globals:
|
||||
Globals.__instance = self
|
||||
self.args = None
|
||||
self.parser = None
|
||||
self.target_node = None
|
||||
self.channel_index = None
|
||||
self.logfile = None
|
||||
self.tunnelInstance = None
|
||||
@@ -36,7 +35,6 @@ class Globals:
|
||||
"""Reset all of our globals. If you add a member, add it to this method, too."""
|
||||
self.args = None
|
||||
self.parser = None
|
||||
self.target_node = None
|
||||
self.channel_index = None
|
||||
self.logfile = None
|
||||
self.tunnelInstance = None
|
||||
@@ -50,10 +48,6 @@ class Globals:
|
||||
"""Set the parser"""
|
||||
self.parser = parser
|
||||
|
||||
def set_target_node(self, target_node):
|
||||
"""Set the target_node"""
|
||||
self.target_node = target_node
|
||||
|
||||
def set_channel_index(self, channel_index):
|
||||
"""Set the channel_index"""
|
||||
self.channel_index = channel_index
|
||||
@@ -75,10 +69,6 @@ class Globals:
|
||||
"""Get parser"""
|
||||
return self.parser
|
||||
|
||||
def get_target_node(self):
|
||||
"""Get target_node"""
|
||||
return self.target_node
|
||||
|
||||
def get_channel_index(self):
|
||||
"""Get channel_index"""
|
||||
return self.channel_index
|
||||
|
||||
@@ -159,7 +159,7 @@ class MeshInterface:
|
||||
|
||||
def getNode(self, nodeId):
|
||||
"""Return a node object which contains device settings and channel info"""
|
||||
if nodeId == LOCAL_ADDR:
|
||||
if nodeId in (LOCAL_ADDR, BROADCAST_ADDR):
|
||||
return self.localNode
|
||||
else:
|
||||
n = meshtastic.node.Node(self, nodeId)
|
||||
|
||||
@@ -257,6 +257,7 @@ class Node:
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.get_radio_request = True
|
||||
|
||||
# TODO: should we check that localNode has an 'admin' channel?
|
||||
# Show progress message for super slow operations
|
||||
if self != self.iface.localNode:
|
||||
print("Requesting preferences from remote node.")
|
||||
|
||||
@@ -40,28 +40,25 @@ class SerialInterface(StreamInterface):
|
||||
|
||||
# first we need to set the HUPCL so the device will not reboot based on RTS and/or DTR
|
||||
# see https://github.com/pyserial/pyserial/issues/124
|
||||
if not self.noProto:
|
||||
if platform.system() != 'Windows':
|
||||
with open(devPath, encoding='utf8') as f:
|
||||
attrs = termios.tcgetattr(f)
|
||||
attrs[2] = attrs[2] & ~termios.HUPCL
|
||||
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
||||
f.close()
|
||||
time.sleep(0.1)
|
||||
if platform.system() != 'Windows':
|
||||
with open(devPath, encoding='utf8') as f:
|
||||
attrs = termios.tcgetattr(f)
|
||||
attrs[2] = attrs[2] & ~termios.HUPCL
|
||||
termios.tcsetattr(f, termios.TCSAFLUSH, attrs)
|
||||
f.close()
|
||||
time.sleep(0.1)
|
||||
|
||||
self.stream = serial.Serial(devPath, 921600, exclusive=True, timeout=0.5, write_timeout=0)
|
||||
if not self.noProto:
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
|
||||
StreamInterface.__init__(self, debugOut=debugOut, noProto=noProto, connectNow=connectNow)
|
||||
|
||||
def close(self):
|
||||
"""Close a connection to the device"""
|
||||
if not self.noProto:
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
self.stream.flush()
|
||||
time.sleep(0.1)
|
||||
logging.debug("Closing Serial stream")
|
||||
StreamInterface.close(self)
|
||||
|
||||
@@ -119,7 +119,6 @@ def test_main_test_no_ports(patched_find_ports, reset_globals, capsys):
|
||||
sys.argv = ['', '--test']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
assert Globals.getInstance().get_target_node() is None
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
@@ -137,7 +136,6 @@ def test_main_test_one_port(patched_find_ports, reset_globals, capsys):
|
||||
sys.argv = ['', '--test']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
assert Globals.getInstance().get_target_node() is None
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
main()
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
@@ -183,7 +181,7 @@ def test_main_test_two_ports_fails(patched_test_all, reset_globals, capsys):
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_main_info(capsys, reset_globals):
|
||||
def test_main_info(capsys, caplog, reset_globals):
|
||||
"""Test --info"""
|
||||
sys.argv = ['', '--info']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
@@ -192,13 +190,14 @@ def test_main_info(capsys, reset_globals):
|
||||
def mock_showInfo():
|
||||
print('inside mocked showInfo')
|
||||
iface.showInfo.side_effect = mock_showInfo
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
main()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
main()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Connected to radio', out, re.MULTILINE)
|
||||
assert re.search(r'inside mocked showInfo', out, re.MULTILINE)
|
||||
assert err == ''
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -1125,18 +1124,14 @@ def test_main_pos_fields_no_args(capsys, reset_globals):
|
||||
pos_flags = MagicMock(autospec=meshtastic.radioconfig_pb2.PositionFlags)
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface') as mo:
|
||||
mo().getNode().radioConfig.preferences.position_flags = 35
|
||||
with patch('meshtastic.radioconfig_pb2.PositionFlags', return_value=pos_flags) as mrc:
|
||||
# kind of cheating here, we are setting up the node
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
anode = mocked_node()
|
||||
anode.radioConfig.preferences.position_flags = 35
|
||||
Globals.getInstance().set_target_node(anode)
|
||||
|
||||
mrc.values.return_value = [0, 1, 2, 4, 8, 16, 32, 64, 128, 256]
|
||||
# Note: When you use side_effect and a list, each call will use a value from the front of the list then
|
||||
# remove that value from the list. If there are three values in the list, we expect it to be called
|
||||
# three times.
|
||||
mrc.Name.side_effect = [ 'POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY' ]
|
||||
mrc.Name.side_effect = ['POS_ALTITUDE', 'POS_ALT_MSL', 'POS_BATTERY']
|
||||
|
||||
main()
|
||||
|
||||
@@ -1217,13 +1212,9 @@ def test_main_get_with_valid_values(capsys, reset_globals):
|
||||
|
||||
with patch('meshtastic.serial_interface.SerialInterface') as mo:
|
||||
|
||||
# kind of cheating here, we are setting up the node
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
anode = mocked_node()
|
||||
anode.radioConfig.preferences.wifi_ssid = 'foo'
|
||||
anode.radioConfig.preferences.ls_secs = 300
|
||||
anode.radioConfig.preferences.fixed_position = False
|
||||
Globals.getInstance().set_target_node(anode)
|
||||
mo().getNode().radioConfig.preferences.wifi_ssid = 'foo'
|
||||
mo().getNode().radioConfig.preferences.ls_secs = 300
|
||||
mo().getNode().radioConfig.preferences.fixed_position = False
|
||||
|
||||
main()
|
||||
|
||||
@@ -1452,7 +1443,7 @@ def test_main_export_config_called_from_main(capsys, reset_globals):
|
||||
@pytest.mark.unit
|
||||
def test_main_gpio_rd_no_gpio_channel(capsys, reset_globals):
|
||||
"""Test --gpio_rd with no named gpio channel"""
|
||||
sys.argv = ['', '--gpio-rd', '0x10']
|
||||
sys.argv = ['', '--gpio-rd', '0x10', '--dest', '!foo']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
@@ -1473,9 +1464,9 @@ def test_main_gpio_rd_no_dest(capsys, reset_globals):
|
||||
sys.argv = ['', '--gpio-rd', '0x2000']
|
||||
Globals.getInstance().set_args(sys.argv)
|
||||
|
||||
channel = Channel(index=1, role=1)
|
||||
channel.settings.modem_config = 3
|
||||
channel.settings.psk = b'\x01'
|
||||
channel = Channel(index=2, role=2)
|
||||
channel.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
|
||||
channel.settings.name = 'gpio'
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode.getChannelByName.return_value = channel
|
||||
|
||||
@@ -3,15 +3,19 @@
|
||||
import re
|
||||
|
||||
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import patch, mock_open
|
||||
import pytest
|
||||
|
||||
from ..serial_interface import SerialInterface
|
||||
|
||||
@pytest.mark.unit
|
||||
@patch("time.sleep")
|
||||
@patch("termios.tcsetattr")
|
||||
@patch("termios.tcgetattr")
|
||||
@patch("builtins.open", new_callable=mock_open, read_data="data")
|
||||
@patch('serial.Serial')
|
||||
@patch('meshtastic.util.findPorts', return_value=['/dev/ttyUSBfake'])
|
||||
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, capsys):
|
||||
def test_SerialInterface_single_port(mocked_findPorts, mocked_serial, mocked_open, mock_get, mock_set, mock_sleep, capsys):
|
||||
"""Test that we can instantiate a SerialInterface with a single port"""
|
||||
iface = SerialInterface(noProto=True)
|
||||
iface.showInfo()
|
||||
|
||||
Reference in New Issue
Block a user