Merge pull request #281 from prampec/extend_canned_message_length

Canned message - Extend messages length
This commit is contained in:
mkinney
2022-02-25 22:58:57 -08:00
committed by GitHub
7 changed files with 698 additions and 6 deletions

View File

@@ -23,7 +23,7 @@ ignore-patterns=mqtt_pb2.py,channel_pb2.py,environmental_measurement_pb2.py,admi
# no Warning level messages displayed, use"--disable=all --enable=classes
# --disable=W"
#
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except
disable=invalid-name,fixme,logging-fstring-interpolation,too-many-statements,too-many-branches,too-many-locals,no-member,f-string-without-interpolation,protected-access,no-self-use,pointless-string-statement,too-few-public-methods,broad-except,no-else-return,no-else-raise,bare-except,too-many-public-methods
[BASIC]

View File

@@ -22,6 +22,12 @@ lint:
slow:
pytest -m unit --durations=5
proto: FORCE
git submodule update --init --recursive
git pull --rebase
git submodule update --remote --merge
./bin/regen-protos.sh
# run the coverage report and open results in a browser
cov:
pytest --cov-report html --cov=meshtastic

View File

@@ -200,6 +200,12 @@ def onConnected(interface):
print(f"Setting device owner short to {args.set_owner_short}")
interface.getNode(args.dest).setOwner(long_name=None, short_name=args.set_owner_short)
# TODO: add to export-config and configure
if args.set_canned_message:
closeNow = True
print(f"Setting canned plugin message to {args.set_canned_message}")
interface.getNode(args.dest).set_canned_message(args.set_canned_message)
if args.pos_fields:
# If --pos-fields invoked with args, set position fields
closeNow = True
@@ -510,6 +516,11 @@ def onConnected(interface):
print(f"Writing modified channels to device")
interface.getNode(args.dest).writeChannel(channelIndex)
if args.get_canned_message:
closeNow = True
print("")
interface.getNode(args.dest).get_canned_message()
if args.info:
print("")
# If we aren't trying to talk to our local node, don't show it
@@ -746,6 +757,9 @@ def initParser():
parser.add_argument("--info", help="Read and display the radio config information",
action="store_true")
parser.add_argument("--get-canned-message", help="Show the canned message plugin message",
action="store_true")
parser.add_argument("--nodes", help="Print Node List in a pretty formatted table",
action="store_true")
@@ -808,6 +822,9 @@ def initParser():
parser.add_argument(
"--set-owner", help="Set device owner name", action="store")
parser.add_argument(
"--set-canned-message", help="Set the canned messages plugin message (up to 1000 characters).", action="store")
parser.add_argument(
"--set-owner-short", help="Set device owner short name", action="store")

View File

@@ -254,11 +254,12 @@ class MeshInterface:
meshPacket.decoded.payload = data
meshPacket.decoded.portnum = portNum
meshPacket.decoded.want_response = wantResponse
meshPacket.id = self._generatePacketId()
if onResponse is not None:
self._addResponseHandler(meshPacket.id, onResponse)
p = self._sendPacket(meshPacket, destinationId,
wantAck=wantAck, hopLimit=hopLimit)
if onResponse is not None:
self._addResponseHandler(p.id, onResponse)
return p
def sendPosition(self, latitude=0.0, longitude=0.0, altitude=0, timeSec=0,

View File

@@ -3,6 +3,7 @@
import logging
import base64
import time
from google.protobuf.json_format import MessageToJson
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
@@ -24,6 +25,13 @@ class Node:
self.partialChannels = None
self.noProto = noProto
self.cannedPluginMessage = None
self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None
def showChannels(self):
"""Show human readable description of our channels."""
print("Channels:")
@@ -56,6 +64,14 @@ class Node:
self.channels = None
self.partialChannels = [] # We keep our channels in a temp array until finished
# Note: We do not get the canned plugin message, unless get_canned_message() is called
self.cannedPluginMessage = None
self.cannedPluginMessagePart1 = None
self.cannedPluginMessagePart2 = None
self.cannedPluginMessagePart3 = None
self.cannedPluginMessagePart4 = None
self._requestSettings()
def turnOffEncryptionOnPrimaryChannel(self):
@@ -64,9 +80,9 @@ class Node:
print("Writing modified channels to device")
self.writeChannel(0)
def waitForConfig(self):
def waitForConfig(self, attribute='channels'):
"""Block until radio config is received. Returns True if config has been received."""
return self._timeout.waitForSet(self, attrs=('radioConfig', 'channels'))
return self._timeout.waitForSet(self, attrs=('radioConfig', attribute))
def writeConfig(self):
"""Write the current (edited) radioConfig to the device"""
@@ -237,7 +253,7 @@ class Node:
"""Handle the response packet for requesting settings _requestSettings()"""
logging.debug(f'onResponseRequestSetting() p:{p}')
errorFound = False
if 'routing' in p["decoded"]:
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
@@ -268,6 +284,156 @@ class Node:
return self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)
def onResponseRequestCannedMessagePluginMessagePart1(self, p):
"""Handle the response packet for requesting canned message plugin message part 1"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart1() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart1 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part1_response
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart2(self, p):
"""Handle the response packet for requesting canned message plugin message part 2"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart2() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart2 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part2_response
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart3(self, p):
"""Handle the response packet for requesting canned message plugin message part 3"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart3() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart3 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part3_response
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
self.gotResponse = True
def onResponseRequestCannedMessagePluginMessagePart4(self, p):
"""Handle the response packet for requesting canned message plugin message part 4"""
logging.debug(f'onResponseRequestCannedMessagePluginMessagePart4() p:{p}')
errorFound = False
if "routing" in p["decoded"]:
if p["decoded"]["routing"]["errorReason"] != "NONE":
errorFound = True
print(f'Error on response: {p["decoded"]["routing"]["errorReason"]}')
if errorFound is False:
if "decoded" in p:
if "admin" in p["decoded"]:
if "raw" in p["decoded"]["admin"]:
self.cannedPluginMessagePart4 = p["decoded"]["admin"]["raw"].get_canned_message_plugin_part4_response
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.gotResponse = True
def get_canned_message(self):
"""Get the canned message string. Concatenate all pieces together and return a single string."""
logging.debug(f'in get_canned_message()')
if not self.cannedPluginMessage:
p1 = admin_pb2.AdminMessage()
p1.get_canned_message_plugin_part1_request = True
self.gotResponse = False
self._sendAdmin(p1, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart1)
while self.gotResponse is False:
time.sleep(0.1)
p2 = admin_pb2.AdminMessage()
p2.get_canned_message_plugin_part2_request = True
self.gotResponse = False
self._sendAdmin(p2, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart2)
while self.gotResponse is False:
time.sleep(0.1)
p3 = admin_pb2.AdminMessage()
p3.get_canned_message_plugin_part3_request = True
self.gotResponse = False
self._sendAdmin(p3, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart3)
while self.gotResponse is False:
time.sleep(0.1)
p4 = admin_pb2.AdminMessage()
p4.get_canned_message_plugin_part4_request = True
self.gotResponse = False
self._sendAdmin(p4, wantResponse=True, onResponse=self.onResponseRequestCannedMessagePluginMessagePart4)
while self.gotResponse is False:
time.sleep(0.1)
# TODO: This feels wrong to have a sleep here. Is there a way to ensure that
# all requests are complete? Perhaps change to a while loop any parts are None... maybe?
time.sleep(3)
logging.debug(f'self.cannedPluginMessagePart1:{self.cannedPluginMessagePart1}')
logging.debug(f'self.cannedPluginMessagePart2:{self.cannedPluginMessagePart2}')
logging.debug(f'self.cannedPluginMessagePart3:{self.cannedPluginMessagePart3}')
logging.debug(f'self.cannedPluginMessagePart4:{self.cannedPluginMessagePart4}')
self.cannedPluginMessage = ""
if self.cannedPluginMessagePart1:
self.cannedPluginMessage += self.cannedPluginMessagePart1
if self.cannedPluginMessagePart2:
self.cannedPluginMessage += self.cannedPluginMessagePart2
if self.cannedPluginMessagePart3:
self.cannedPluginMessage += self.cannedPluginMessagePart3
if self.cannedPluginMessagePart4:
self.cannedPluginMessage += self.cannedPluginMessagePart4
print(f'canned_plugin_message:{self.cannedPluginMessage}')
logging.debug(f'canned_plugin_message:{self.cannedPluginMessage}')
return self.cannedPluginMessage
def set_canned_message(self, message):
"""Set the canned message. Split into parts of 200 chars each."""
if len(message) > 800:
our_exit("Warning: The canned message must be less than 800 characters.")
# split into chunks
chunks = []
chunks_size = 200
for i in range(0, len(message), chunks_size):
chunks.append(message[i: i + chunks_size])
# for each chunk, send a message to set the values
#for i in range(0, len(chunks)):
for i, chunk in enumerate(chunks):
p = admin_pb2.AdminMessage()
# TODO: should be a way to improve this
if i == 0:
p.set_canned_message_plugin_part1 = chunk
elif i == 1:
p.set_canned_message_plugin_part2 = chunk
elif i == 2:
p.set_canned_message_plugin_part3 = chunk
elif i == 3:
p.set_canned_message_plugin_part4 = chunk
logging.debug(f"Setting canned message '{chunk}' part {i+1}")
self._sendAdmin(p)
def exitSimulator(self):
"""Tell a simulator node to exit (this message
is ignored for other nodes)"""

View File

@@ -442,6 +442,43 @@ def test_main_set_owner_short_to_bob(capsys):
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_canned_messages(capsys):
"""Test --set-canned-message """
sys.argv = ['', '--set-canned-message', 'foo']
Globals.getInstance().set_args(sys.argv)
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'Setting canned plugin message to foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_get_canned_messages(capsys, caplog, iface_with_nodes):
"""Test --get-canned-message """
sys.argv = ['', '--get-canned-message']
Globals.getInstance().set_args(sys.argv)
iface = iface_with_nodes
iface.localNode.cannedPluginMessage = 'foo'
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
main()
out, err = capsys.readouterr()
assert re.search(r'Connected to radio', out, re.MULTILINE)
assert re.search(r'canned_plugin_message:foo', out, re.MULTILINE)
assert err == ''
mo.assert_called()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_globals")
def test_main_set_ham_to_KI123(capsys):

View File

@@ -11,6 +11,9 @@ from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig
from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
CannedMessagePluginMessagePart3, CannedMessagePluginMessagePart4,
CannedMessagePluginMessagePart5)
from ..util import Timeout
@@ -43,6 +46,122 @@ def test_node_requestConfig(capsys):
assert err == ''
@pytest.mark.unit
def test_node_get_canned_message_with_all_parts(capsys):
"""Test run get_canned_message()"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# we have a sleep in this method, so override it so it goes fast
with patch('time.sleep'):
anode = Node(mo, 'bar')
anode.cannedPluginMessagePart1 = 'a'
anode.cannedPluginMessagePart2 = 'b'
anode.cannedPluginMessagePart3 = 'c'
anode.cannedPluginMessagePart4 = 'd'
anode.cannedPluginMessagePart5 = 'e'
anode.get_canned_message()
out, err = capsys.readouterr()
assert re.search(r'canned_plugin_message:abcde', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_node_get_canned_message_with_some_parts(capsys):
"""Test run get_canned_message()"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
# we have a sleep in this method, so override it so it goes fast
with patch('time.sleep'):
anode = Node(mo, 'bar')
anode.cannedPluginMessagePart1 = 'a'
anode.get_canned_message()
out, err = capsys.readouterr()
assert re.search(r'canned_plugin_message:a', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_node_set_canned_message_one_part(caplog):
"""Test run set_canned_message()"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
anode = Node(mo, 'bar')
anode.set_canned_message('foo')
assert re.search(r"Setting canned message 'foo' part 1", caplog.text, re.MULTILINE)
assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_node_set_canned_message_200(caplog):
"""Test run set_canned_message() 200 characters long"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
anode = Node(mo, 'bar')
message_200_chars_long = 'a' * 200
anode.set_canned_message(message_200_chars_long)
assert re.search(r" part 1", caplog.text, re.MULTILINE)
assert not re.search(r"Setting canned message '' part 2", caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_node_set_canned_message_201(caplog):
"""Test run set_canned_message() 201 characters long"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
anode = Node(mo, 'bar')
message_201_chars_long = 'a' * 201
anode.set_canned_message(message_201_chars_long)
assert re.search(r" part 1", caplog.text, re.MULTILINE)
assert re.search(r"Setting canned message 'a' part 2", caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_node_set_canned_message_1000(caplog):
"""Test run set_canned_message() 1000 characters long"""
iface = MagicMock(autospec=SerialInterface)
amesg = MagicMock(autospec=AdminMessage)
with caplog.at_level(logging.DEBUG):
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
anode = Node(mo, 'bar')
message_1000_chars_long = 'a' * 1000
anode.set_canned_message(message_1000_chars_long)
assert re.search(r" part 1", caplog.text, re.MULTILINE)
assert re.search(r" part 2", caplog.text, re.MULTILINE)
assert re.search(r" part 3", caplog.text, re.MULTILINE)
assert re.search(r" part 4", caplog.text, re.MULTILINE)
assert re.search(r" part 5", caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_node_set_canned_message_1001(capsys):
"""Test run set_canned_message() 1001 characters long"""
iface = MagicMock(autospec=SerialInterface)
with pytest.raises(SystemExit) as pytest_wrapped_e:
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar')
message_1001_chars_long = 'a' * 1001
anode.set_canned_message(message_1001_chars_long)
assert pytest_wrapped_e.type == SystemExit
assert pytest_wrapped_e.value.code == 1
out, err = capsys.readouterr()
assert re.search(r'Warning: The canned message', out, re.MULTILINE)
assert err == ''
@pytest.mark.unit
def test_setOwner_and_team(caplog):
"""Test setOwner"""
@@ -694,6 +813,352 @@ def test_requestChannel_localNode(caplog):
assert not re.search(r'from remote node', caplog.text, re.MULTILINE)
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
"""Test onResponseRequestCannedMessagePluginMessagePart1()"""
part1 = CannedMessagePluginMessagePart1()
part1.text = 'foo1'
msg1 = MagicMock(autospec=AdminMessage)
msg1.get_canned_message_plugin_part1_response = part1
packet = {
'from': 682968612,
'to': 682968612,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': 'faked',
'requestId': 927039000,
'admin': {
'getCannedMessagePluginPart1Response': {'text': 'foo1'},
'raw': msg1
}
},
'id': 589440320,
'rxTime': 1642710843,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!28b54624',
'toId': '!28b54624'
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
assert anode.cannedPluginMessagePart1 == 'foo1'
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart2(caplog):
"""Test onResponseRequestCannedMessagePluginMessagePart2()"""
part2 = CannedMessagePluginMessagePart2()
part2.text = 'foo2'
msg2 = MagicMock(autospec=AdminMessage)
msg2.get_canned_message_plugin_part2_response = part2
packet = {
'from': 682968612,
'to': 682968612,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': 'faked',
'requestId': 927039000,
'admin': {
'getCannedMessagePluginPart2Response': {'text': 'foo2'},
'raw': msg2
}
},
'id': 589440320,
'rxTime': 1642710843,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!28b54624',
'toId': '!28b54624'
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
assert anode.cannedPluginMessagePart2 == 'foo2'
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart3(caplog):
"""Test onResponseRequestCannedMessagePluginMessagePart3()"""
part3 = CannedMessagePluginMessagePart3()
part3.text = 'foo3'
msg3 = MagicMock(autospec=AdminMessage)
msg3.get_canned_message_plugin_part3_response = part3
packet = {
'from': 682968612,
'to': 682968612,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': 'faked',
'requestId': 927039000,
'admin': {
'getCannedMessagePluginPart3Response': {'text': 'foo3'},
'raw': msg3
}
},
'id': 589440320,
'rxTime': 1642710843,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!28b54624',
'toId': '!28b54624'
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
assert anode.cannedPluginMessagePart3 == 'foo3'
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart4(caplog):
"""Test onResponseRequestCannedMessagePluginMessagePart4()"""
part4 = CannedMessagePluginMessagePart4()
part4.text = 'foo4'
msg4 = MagicMock(autospec=AdminMessage)
msg4.get_canned_message_plugin_part4_response = part4
packet = {
'from': 682968612,
'to': 682968612,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': 'faked',
'requestId': 927039000,
'admin': {
'getCannedMessagePluginPart4Response': {'text': 'foo4'},
'raw': msg4
}
},
'id': 589440320,
'rxTime': 1642710843,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!28b54624',
'toId': '!28b54624'
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
assert anode.cannedPluginMessagePart4 == 'foo4'
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart5(caplog):
"""Test onResponseRequestCannedMessagePluginMessagePart5()"""
part5 = CannedMessagePluginMessagePart5()
part5.text = 'foo5'
msg5 = MagicMock(autospec=AdminMessage)
msg5.get_canned_message_plugin_part5_response = part5
packet = {
'from': 682968612,
'to': 682968612,
'decoded': {
'portnum': 'ADMIN_APP',
'payload': 'faked',
'requestId': 927039000,
'admin': {
'getCannedMessagePluginPart5Response': {'text': 'foo5'},
'raw': msg5
}
},
'id': 589440320,
'rxTime': 1642710843,
'hopLimit': 3,
'priority': 'RELIABLE',
'raw': 'faked',
'fromId': '!28b54624',
'toId': '!28b54624'
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
assert anode.cannedPluginMessagePart5 == 'foo5'
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart1_error(caplog, capsys):
"""Test onResponseRequestCannedMessagePluginMessagePart1() with error"""
packet = {
'decoded': {
'routing': {
'errorReason': 'some made up error',
},
},
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart1(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart1', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart2_error(caplog, capsys):
"""Test onResponseRequestCannedMessagePluginMessagePart2() with error"""
packet = {
'decoded': {
'routing': {
'errorReason': 'some made up error',
},
},
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart2(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart2', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart3_error(caplog, capsys):
"""Test onResponseRequestCannedMessagePluginMessagePart3() with error"""
packet = {
'decoded': {
'routing': {
'errorReason': 'some made up error',
},
},
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart3(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart3', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart4_error(caplog, capsys):
"""Test onResponseRequestCannedMessagePluginMessagePart4() with error"""
packet = {
'decoded': {
'routing': {
'errorReason': 'some made up error',
},
},
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart4(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart4', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''
@pytest.mark.unit
def test_onResponseRequestCannedMessagePluginMesagePart5_error(caplog, capsys):
"""Test onResponseRequestCannedMessagePluginMessagePart5() with error"""
packet = {
'decoded': {
'routing': {
'errorReason': 'some made up error',
},
},
}
iface = MagicMock(autospec=SerialInterface)
with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
anode = Node(mo, 'bar', noProto=True)
# Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
mo.localNode = anode
with caplog.at_level(logging.DEBUG):
anode.onResponseRequestCannedMessagePluginMessagePart5(packet)
assert re.search(r'onResponseRequestCannedMessagePluginMessagePart5', caplog.text, re.MULTILINE)
out, err = capsys.readouterr()
assert re.search(r'Error on response', out)
assert err == ''
@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
"""Test onResponseRequestChannel()"""