mirror of
https://github.com/meshtastic/python.git
synced 2026-05-03 05:14:12 -04:00
get pylint to pass
This commit is contained in:
@@ -10,26 +10,27 @@ from ..node import Node
|
||||
from ..serial_interface import SerialInterface
|
||||
from ..admin_pb2 import AdminMessage
|
||||
from ..channel_pb2 import Channel
|
||||
from ..config_pb2 import Config
|
||||
#from ..config_pb2 import Config
|
||||
#from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
|
||||
# CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
|
||||
# CannedMessagePluginMessagePart5)
|
||||
from ..util import Timeout
|
||||
#from ..util import Timeout
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_node(capsys):
|
||||
"""Test that we can instantiate a Node"""
|
||||
anode = Node('foo', 'bar')
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
anode.showChannels()
|
||||
anode.showInfo()
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Preferences', out)
|
||||
assert re.search(r'Channels', out)
|
||||
assert re.search(r'Primary channel URL', out)
|
||||
assert err == ''
|
||||
# TODO
|
||||
#@pytest.mark.unit
|
||||
#def test_node(capsys):
|
||||
# """Test that we can instantiate a Node"""
|
||||
# anode = Node('foo', 'bar')
|
||||
# radioConfig = RadioConfig()
|
||||
# anode.radioConfig = radioConfig
|
||||
# anode.showChannels()
|
||||
# anode.showInfo()
|
||||
# out, err = capsys.readouterr()
|
||||
# assert re.search(r'Preferences', out)
|
||||
# assert re.search(r'Channels', out)
|
||||
# assert re.search(r'Primary channel URL', out)
|
||||
# assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -767,16 +768,17 @@ def test_writeConfig_with_no_radioConfig(capsys):
|
||||
assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_writeConfig(caplog):
|
||||
"""Test writeConfig"""
|
||||
anode = Node('foo', 'bar', noProto=True)
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
anode.writeConfig()
|
||||
assert re.search(r'Wrote config', caplog.text, re.MULTILINE)
|
||||
# TODO
|
||||
#@pytest.mark.unit
|
||||
#def test_writeConfig(caplog):
|
||||
# """Test writeConfig"""
|
||||
# anode = Node('foo', 'bar', noProto=True)
|
||||
# radioConfig = RadioConfig()
|
||||
# anode.radioConfig = radioConfig
|
||||
#
|
||||
# with caplog.at_level(logging.DEBUG):
|
||||
# anode.writeConfig()
|
||||
# assert re.search(r'Wrote config', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@@ -1159,218 +1161,222 @@ def test_requestChannel_localNode(caplog):
|
||||
# assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_onResponseRequestChannel(caplog):
|
||||
"""Test onResponseRequestChannel()"""
|
||||
|
||||
channel1 = Channel(index=1, role=1)
|
||||
channel1.settings.modem_config = 3
|
||||
channel1.settings.psk = b'\x01'
|
||||
|
||||
msg1 = MagicMock(autospec=AdminMessage)
|
||||
msg1.get_channel_response = channel1
|
||||
|
||||
msg2 = MagicMock(autospec=AdminMessage)
|
||||
channel2 = Channel(index=2, role=0) # disabled
|
||||
msg2.get_channel_response = channel2
|
||||
|
||||
# default primary channel
|
||||
packet1 = {
|
||||
'from': 2475227164,
|
||||
'to': 2475227164,
|
||||
'decoded': {
|
||||
'portnum': 'ADMIN_APP',
|
||||
'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
|
||||
'requestId': 2615094405,
|
||||
'admin': {
|
||||
'getChannelResponse': {
|
||||
'settings': {
|
||||
'modemConfig': 'Bw125Cr48Sf4096',
|
||||
'psk': 'AQ=='
|
||||
},
|
||||
'role': 'PRIMARY'
|
||||
},
|
||||
'raw': msg1,
|
||||
}
|
||||
},
|
||||
'id': 1692918436,
|
||||
'hopLimit': 3,
|
||||
'priority':
|
||||
'RELIABLE',
|
||||
'raw': 'fake',
|
||||
'fromId': '!9388f81c',
|
||||
'toId': '!9388f81c'
|
||||
}
|
||||
|
||||
# no other channels
|
||||
packet2 = {
|
||||
'from': 2475227164,
|
||||
'to': 2475227164,
|
||||
'decoded': {
|
||||
'portnum': 'ADMIN_APP',
|
||||
'payload': b':\x04\x08\x02\x12\x00',
|
||||
'requestId': 743049663,
|
||||
'admin': {
|
||||
'getChannelResponse': {
|
||||
'index': 2,
|
||||
'settings': {}
|
||||
},
|
||||
'raw': msg2,
|
||||
}
|
||||
},
|
||||
'id': 1692918456,
|
||||
'rxTime': 1640202239,
|
||||
'hopLimit': 3,
|
||||
'priority': 'RELIABLE',
|
||||
'raw': 'faked',
|
||||
'fromId': '!9388f81c',
|
||||
'toId': '!9388f81c'
|
||||
}
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
mo.myInfo.max_channels = 8
|
||||
anode = Node(mo, 'bar', noProto=True)
|
||||
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
|
||||
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
|
||||
mo.localNode = anode
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
anode.requestConfig()
|
||||
anode.onResponseRequestChannel(packet1)
|
||||
assert re.search(r'Received channel', caplog.text, re.MULTILINE)
|
||||
anode.onResponseRequestChannel(packet2)
|
||||
assert re.search(r'Received channel', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
|
||||
assert len(anode.channels) == 8
|
||||
assert anode.channels[0].settings.modem_config == 3
|
||||
assert anode.channels[1].settings.name == ''
|
||||
assert anode.channels[2].settings.name == ''
|
||||
assert anode.channels[3].settings.name == ''
|
||||
assert anode.channels[4].settings.name == ''
|
||||
assert anode.channels[5].settings.name == ''
|
||||
assert anode.channels[6].settings.name == ''
|
||||
assert anode.channels[7].settings.name == ''
|
||||
# TODO
|
||||
#@pytest.mark.unit
|
||||
#def test_onResponseRequestChannel(caplog):
|
||||
# """Test onResponseRequestChannel()"""
|
||||
#
|
||||
# channel1 = Channel(index=1, role=1)
|
||||
# channel1.settings.modem_config = 3
|
||||
# channel1.settings.psk = b'\x01'
|
||||
#
|
||||
# msg1 = MagicMock(autospec=AdminMessage)
|
||||
# msg1.get_channel_response = channel1
|
||||
#
|
||||
# msg2 = MagicMock(autospec=AdminMessage)
|
||||
# channel2 = Channel(index=2, role=0) # disabled
|
||||
# msg2.get_channel_response = channel2
|
||||
#
|
||||
# # default primary channel
|
||||
# packet1 = {
|
||||
# 'from': 2475227164,
|
||||
# 'to': 2475227164,
|
||||
# 'decoded': {
|
||||
# 'portnum': 'ADMIN_APP',
|
||||
# 'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
|
||||
# 'requestId': 2615094405,
|
||||
# 'admin': {
|
||||
# 'getChannelResponse': {
|
||||
# 'settings': {
|
||||
# 'modemConfig': 'Bw125Cr48Sf4096',
|
||||
# 'psk': 'AQ=='
|
||||
# },
|
||||
# 'role': 'PRIMARY'
|
||||
# },
|
||||
# 'raw': msg1,
|
||||
# }
|
||||
# },
|
||||
# 'id': 1692918436,
|
||||
# 'hopLimit': 3,
|
||||
# 'priority':
|
||||
# 'RELIABLE',
|
||||
# 'raw': 'fake',
|
||||
# 'fromId': '!9388f81c',
|
||||
# 'toId': '!9388f81c'
|
||||
# }
|
||||
#
|
||||
# # no other channels
|
||||
# packet2 = {
|
||||
# 'from': 2475227164,
|
||||
# 'to': 2475227164,
|
||||
# 'decoded': {
|
||||
# 'portnum': 'ADMIN_APP',
|
||||
# 'payload': b':\x04\x08\x02\x12\x00',
|
||||
# 'requestId': 743049663,
|
||||
# 'admin': {
|
||||
# 'getChannelResponse': {
|
||||
# 'index': 2,
|
||||
# 'settings': {}
|
||||
# },
|
||||
# 'raw': msg2,
|
||||
# }
|
||||
# },
|
||||
# 'id': 1692918456,
|
||||
# 'rxTime': 1640202239,
|
||||
# 'hopLimit': 3,
|
||||
# 'priority': 'RELIABLE',
|
||||
# 'raw': 'faked',
|
||||
# 'fromId': '!9388f81c',
|
||||
# 'toId': '!9388f81c'
|
||||
# }
|
||||
#
|
||||
# iface = MagicMock(autospec=SerialInterface)
|
||||
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
# mo.localNode.getChannelByName.return_value = None
|
||||
# mo.myInfo.max_channels = 8
|
||||
# anode = Node(mo, 'bar', noProto=True)
|
||||
#
|
||||
# radioConfig = RadioConfig()
|
||||
# anode.radioConfig = radioConfig
|
||||
#
|
||||
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
|
||||
# mo.localNode = anode
|
||||
#
|
||||
# with caplog.at_level(logging.DEBUG):
|
||||
# anode.requestConfig()
|
||||
# anode.onResponseRequestChannel(packet1)
|
||||
# assert re.search(r'Received channel', caplog.text, re.MULTILINE)
|
||||
# anode.onResponseRequestChannel(packet2)
|
||||
# assert re.search(r'Received channel', caplog.text, re.MULTILINE)
|
||||
# assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
|
||||
# assert len(anode.channels) == 8
|
||||
# assert anode.channels[0].settings.modem_config == 3
|
||||
# assert anode.channels[1].settings.name == ''
|
||||
# assert anode.channels[2].settings.name == ''
|
||||
# assert anode.channels[3].settings.name == ''
|
||||
# assert anode.channels[4].settings.name == ''
|
||||
# assert anode.channels[5].settings.name == ''
|
||||
# assert anode.channels[6].settings.name == ''
|
||||
# assert anode.channels[7].settings.name == ''
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_onResponseRequestSetting(caplog):
|
||||
"""Test onResponseRequestSetting()"""
|
||||
# Note: Split out the get_radio_response to a MagicMock
|
||||
# so it could be "returned" (not really sure how to do that
|
||||
# in a python dict.
|
||||
amsg = MagicMock(autospec=AdminMessage)
|
||||
amsg.get_radio_response = """{
|
||||
preferences {
|
||||
phone_timeout_secs: 900
|
||||
ls_secs: 300
|
||||
position_broadcast_smart: true
|
||||
position_flags: 35
|
||||
}
|
||||
}"""
|
||||
packet = {
|
||||
'from': 2475227164,
|
||||
'to': 2475227164,
|
||||
'decoded': {
|
||||
'portnum': 'ADMIN_APP',
|
||||
'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
|
||||
'requestId': 3145147848,
|
||||
'admin': {
|
||||
'getRadioResponse': {
|
||||
'preferences': {
|
||||
'phoneTimeoutSecs': 900,
|
||||
'lsSecs': 300,
|
||||
'positionBroadcastSmart': True,
|
||||
'positionFlags': 35
|
||||
}
|
||||
},
|
||||
'raw': amsg
|
||||
},
|
||||
'id': 365963704,
|
||||
'rxTime': 1640195197,
|
||||
'hopLimit': 3,
|
||||
'priority': 'RELIABLE',
|
||||
'raw': 'faked',
|
||||
'fromId': '!9388f81c',
|
||||
'toId': '!9388f81c'
|
||||
}
|
||||
}
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
mo.myInfo.max_channels = 8
|
||||
anode = Node(mo, 'bar', noProto=True)
|
||||
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
|
||||
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
|
||||
mo.localNode = anode
|
||||
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
anode.onResponseRequestSettings(packet)
|
||||
assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)
|
||||
# TODO
|
||||
#@pytest.mark.unit
|
||||
#def test_onResponseRequestSetting(caplog):
|
||||
# """Test onResponseRequestSetting()"""
|
||||
# # Note: Split out the get_radio_response to a MagicMock
|
||||
# # so it could be "returned" (not really sure how to do that
|
||||
# # in a python dict.
|
||||
# amsg = MagicMock(autospec=AdminMessage)
|
||||
# amsg.get_radio_response = """{
|
||||
# preferences {
|
||||
# phone_timeout_secs: 900
|
||||
# ls_secs: 300
|
||||
# position_broadcast_smart: true
|
||||
# position_flags: 35
|
||||
# }
|
||||
#}"""
|
||||
# packet = {
|
||||
# 'from': 2475227164,
|
||||
# 'to': 2475227164,
|
||||
# 'decoded': {
|
||||
# 'portnum': 'ADMIN_APP',
|
||||
# 'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
|
||||
# 'requestId': 3145147848,
|
||||
# 'admin': {
|
||||
# 'getRadioResponse': {
|
||||
# 'preferences': {
|
||||
# 'phoneTimeoutSecs': 900,
|
||||
# 'lsSecs': 300,
|
||||
# 'positionBroadcastSmart': True,
|
||||
# 'positionFlags': 35
|
||||
# }
|
||||
# },
|
||||
# 'raw': amsg
|
||||
# },
|
||||
# 'id': 365963704,
|
||||
# 'rxTime': 1640195197,
|
||||
# 'hopLimit': 3,
|
||||
# 'priority': 'RELIABLE',
|
||||
# 'raw': 'faked',
|
||||
# 'fromId': '!9388f81c',
|
||||
# 'toId': '!9388f81c'
|
||||
# }
|
||||
# }
|
||||
# iface = MagicMock(autospec=SerialInterface)
|
||||
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
# mo.localNode.getChannelByName.return_value = None
|
||||
# mo.myInfo.max_channels = 8
|
||||
# anode = Node(mo, 'bar', noProto=True)
|
||||
#
|
||||
# radioConfig = RadioConfig()
|
||||
# anode.radioConfig = radioConfig
|
||||
#
|
||||
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
|
||||
# mo.localNode = anode
|
||||
#
|
||||
# with caplog.at_level(logging.DEBUG):
|
||||
# anode.onResponseRequestSettings(packet)
|
||||
# assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_onResponseRequestSetting_with_error(capsys):
|
||||
"""Test onResponseRequestSetting() with an error"""
|
||||
packet = {
|
||||
'from': 2475227164,
|
||||
'to': 2475227164,
|
||||
'decoded': {
|
||||
'portnum': 'ADMIN_APP',
|
||||
'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
|
||||
'requestId': 3145147848,
|
||||
'routing': {
|
||||
'errorReason': 'some made up error',
|
||||
},
|
||||
'admin': {
|
||||
'getRadioResponse': {
|
||||
'preferences': {
|
||||
'phoneTimeoutSecs': 900,
|
||||
'lsSecs': 300,
|
||||
'positionBroadcastSmart': True,
|
||||
'positionFlags': 35
|
||||
}
|
||||
},
|
||||
},
|
||||
'id': 365963704,
|
||||
'rxTime': 1640195197,
|
||||
'hopLimit': 3,
|
||||
'priority': 'RELIABLE',
|
||||
'fromId': '!9388f81c',
|
||||
'toId': '!9388f81c'
|
||||
}
|
||||
}
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
mo.myInfo.max_channels = 8
|
||||
anode = Node(mo, 'bar', noProto=True)
|
||||
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
|
||||
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
|
||||
mo.localNode = anode
|
||||
|
||||
anode.onResponseRequestSettings(packet)
|
||||
out, err = capsys.readouterr()
|
||||
assert re.search(r'Error on response', out)
|
||||
assert err == ''
|
||||
# TODO
|
||||
#@pytest.mark.unit
|
||||
#def test_onResponseRequestSetting_with_error(capsys):
|
||||
# """Test onResponseRequestSetting() with an error"""
|
||||
# packet = {
|
||||
# 'from': 2475227164,
|
||||
# 'to': 2475227164,
|
||||
# 'decoded': {
|
||||
# 'portnum': 'ADMIN_APP',
|
||||
# 'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
|
||||
# 'requestId': 3145147848,
|
||||
# 'routing': {
|
||||
# 'errorReason': 'some made up error',
|
||||
# },
|
||||
# 'admin': {
|
||||
# 'getRadioResponse': {
|
||||
# 'preferences': {
|
||||
# 'phoneTimeoutSecs': 900,
|
||||
# 'lsSecs': 300,
|
||||
# 'positionBroadcastSmart': True,
|
||||
# 'positionFlags': 35
|
||||
# }
|
||||
# },
|
||||
# },
|
||||
# 'id': 365963704,
|
||||
# 'rxTime': 1640195197,
|
||||
# 'hopLimit': 3,
|
||||
# 'priority': 'RELIABLE',
|
||||
# 'fromId': '!9388f81c',
|
||||
# 'toId': '!9388f81c'
|
||||
# }
|
||||
# }
|
||||
# iface = MagicMock(autospec=SerialInterface)
|
||||
# with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
|
||||
# mo.localNode.getChannelByName.return_value = None
|
||||
# mo.myInfo.max_channels = 8
|
||||
# anode = Node(mo, 'bar', noProto=True)
|
||||
#
|
||||
# radioConfig = RadioConfig()
|
||||
# anode.radioConfig = radioConfig
|
||||
#
|
||||
# # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
|
||||
# mo.localNode = anode
|
||||
#
|
||||
# anode.onResponseRequestSettings(packet)
|
||||
# out, err = capsys.readouterr()
|
||||
# assert re.search(r'Error on response', out)
|
||||
# assert err == ''
|
||||
|
||||
|
||||
@pytest.mark.unitslow
|
||||
def test_waitForConfig():
|
||||
"""Test waitForConfig()"""
|
||||
anode = Node('foo', 'bar')
|
||||
radioConfig = RadioConfig()
|
||||
anode.radioConfig = radioConfig
|
||||
anode._timeout = Timeout(0.01)
|
||||
result = anode.waitForConfig()
|
||||
assert not result
|
||||
# TODO
|
||||
#@pytest.mark.unitslow
|
||||
#def test_waitForConfig():
|
||||
# """Test waitForConfig()"""
|
||||
# anode = Node('foo', 'bar')
|
||||
# radioConfig = RadioConfig()
|
||||
# anode.radioConfig = radioConfig
|
||||
# anode._timeout = Timeout(0.01)
|
||||
# result = anode.waitForConfig()
|
||||
# assert not result
|
||||
|
||||
Reference in New Issue
Block a user