diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 6fe4da6..83911ba 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -27,6 +27,7 @@ def onReceive(packet, interface): args = our_globals.get_args() try: d = packet.get('decoded') + logging.debug(f'd:{d}') # Exit once we receive a reply if args and args.sendtext and packet["to"] == interface.myInfo.my_node_num and d["portnum"] == portnums_pb2.PortNum.TEXT_MESSAGE_APP: @@ -243,8 +244,12 @@ def onConnected(interface): channelIndex = 0 if args.ch_index is not None: channelIndex = int(args.ch_index) - print(f"Sending text message {args.sendtext} to {args.destOrAll}") - interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex) + ch = interface.getChannelByChannelIndex(channelIndex) + if ch and ch.role != channel_pb2.Channel.Role.DISABLED: + print(f"Sending text message {args.sendtext} to {args.destOrAll} on channelIndex:{channelIndex}") + interface.sendText(args.sendtext, args.destOrAll, wantAck=True, channelIndex=channelIndex) + else: + meshtastic.util.our_exit(f"Warning: {channelIndex} is not a valid channel. Channel must not be DISABLED.") if args.sendping: payload = str.encode("test string") @@ -565,7 +570,8 @@ def common(): our_globals = Globals.getInstance() args = our_globals.get_args() parser = our_globals.get_parser() - logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) + logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO, + format='%(levelname)s file:%(filename)s %(funcName)s line:%(lineno)s %(message)s') if len(sys.argv) == 1: parser.print_help(sys.stderr) @@ -688,7 +694,7 @@ def initParser(): "--seturl", help="Set a channel URL", action="store") parser.add_argument( - "--ch-index", help="Set the specified channel index", action="store") + "--ch-index", help="Set the specified channel index. Channels start at 0 (0 is the PRIMARY channel).", action="store") parser.add_argument( "--ch-add", help="Add a secondary channel, you must specify a channel name", default=None) @@ -738,7 +744,7 @@ def initParser(): "--dest", help="The destination node id for any sent commands, if not set '^all' or '^local' is assumed as appropriate", default=None) parser.add_argument( - "--sendtext", help="Send a text message") + "--sendtext", help="Send a text message. Can specify a destination '--dest' and/or channel index '--ch-index'.") parser.add_argument( "--sendping", help="Send a ping message (which requests a reply)", action="store_true") @@ -746,9 +752,6 @@ def initParser(): parser.add_argument( "--reboot", help="Tell the destination node to reboot", action="store_true") - # parser.add_argument( - # "--repeat", help="Normally the send commands send only one message, use this option to request repeated sends") - parser.add_argument( "--reply", help="Reply to received messages", action="store_true") diff --git a/meshtastic/node.py b/meshtastic/node.py index 3158d57..003c945 100644 --- a/meshtastic/node.py +++ b/meshtastic/node.py @@ -90,6 +90,16 @@ class Node: self._sendAdmin(p, adminIndex=adminIndex) logging.debug(f"Wrote channel {channelIndex}") + def getChannelByChannelIndex(self, channelIndex): + """Get channel by channelIndex + channelIndex: number, typically 0-7; based on max number channels + returns: None if there is no channel found + """ + ch = None + if self.channels and 0 <= channelIndex < len(self.channels): + ch = self.channels[channelIndex] + return ch + def deleteChannel(self, channelIndex): """Delete the specifed channelIndex and shift other channels up""" ch = self.channels[channelIndex] diff --git a/meshtastic/tests/test_main.py b/meshtastic/tests/test_main.py index f8a6734..c83cbe5 100644 --- a/meshtastic/tests/test_main.py +++ b/meshtastic/tests/test_main.py @@ -418,6 +418,48 @@ def test_main_sendtext(capsys, reset_globals): mo.assert_called() +@pytest.mark.unit +def test_main_sendtext_with_channel(capsys, reset_globals): + """Test --sendtext""" + sys.argv = ['', '--sendtext', 'hello', '--ch-index', '1'] + Globals.getInstance().set_args(sys.argv) + + iface = MagicMock(autospec=SerialInterface) + def mock_sendText(text, dest, wantAck, channelIndex): + print('inside mocked sendText') + iface.sendText.side_effect = mock_sendText + + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + main() + out, err = capsys.readouterr() + assert re.search(r'Connected to radio', out, re.MULTILINE) + assert re.search(r'Sending text message', out, re.MULTILINE) + assert re.search(r'on channelIndex:1', out, re.MULTILINE) + assert re.search(r'inside mocked sendText', out, re.MULTILINE) + assert err == '' + mo.assert_called() + + +@pytest.mark.unit +def test_main_sendtext_with_invalid_channel(capsys, reset_globals): + """Test --sendtext""" + sys.argv = ['', '--sendtext', 'hello', '--ch-index', '-1'] + Globals.getInstance().set_args(sys.argv) + + iface = MagicMock(autospec=SerialInterface) + iface.getChannelByChannelIndex.return_value = None + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + #mo.getChannelByChannelIndex.return_value = None + with pytest.raises(SystemExit) as pytest_wrapped_e: + main() + assert pytest_wrapped_e.type == SystemExit + assert pytest_wrapped_e.value.code == 1 + out, err = capsys.readouterr() + assert re.search(r'is not a valid channel', out, re.MULTILINE) + assert err == '' + mo.assert_called() + + @pytest.mark.unit def test_main_sendtext_with_dest(capsys, reset_globals): """Test --sendtext with --dest""" @@ -1201,7 +1243,7 @@ def test_main_onConnection(reset_globals, capsys): @pytest.mark.unit def test_main_export_config(reset_globals, capsys): - """Test export_config""" + """Test export_config() function directly""" iface = MagicMock(autospec=SerialInterface) with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: mo.getLongName.return_value = 'foo' @@ -1229,3 +1271,19 @@ position_flags: 35""" assert re.search(r"fixed_position: 'true'", out, re.MULTILINE) assert re.search(r"position_flags: 35", out, re.MULTILINE) assert err == '' + + +@pytest.mark.unit +def test_main_export_config_called_from_main(capsys, reset_globals): + """Test --export-config""" + sys.argv = ['', '--export-config'] + Globals.getInstance().set_args(sys.argv) + + iface = MagicMock(autospec=SerialInterface) + with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo: + main() + out, err = capsys.readouterr() + assert re.search(r'Connected to radio', out, re.MULTILINE) + assert re.search(r'# start of Meshtastic configure yaml', out, re.MULTILINE) + assert err == '' + mo.assert_called() diff --git a/meshtastic/tests/test_node.py b/meshtastic/tests/test_node.py index e9b2ff8..02e0f19 100644 --- a/meshtastic/tests/test_node.py +++ b/meshtastic/tests/test_node.py @@ -186,6 +186,35 @@ def test_showChannels(capsys): assert err == '' +@pytest.mark.unit +def test_getChannelByChannelIndex(): + """Test getChannelByChannelIndex()""" + anode = Node('foo', 'bar') + + channel1 = Channel(index=1, role=1) # primary channel + channel2 = Channel(index=2, role=2) # secondary channel + channel3 = Channel(index=3, role=0) + channel4 = Channel(index=4, role=0) + channel5 = Channel(index=5, role=0) + channel6 = Channel(index=6, role=0) + channel7 = Channel(index=7, role=0) + channel8 = Channel(index=8, role=0) + + channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ] + + anode.channels = channels + + # test primary + assert anode.getChannelByChannelIndex(0) is not None + # test secondary + assert anode.getChannelByChannelIndex(1) is not None + # test disabled + assert anode.getChannelByChannelIndex(2) is not None + # test invalid values + assert anode.getChannelByChannelIndex(-1) is None + assert anode.getChannelByChannelIndex(9) is None + + @pytest.mark.unit def test_deleteChannel_try_to_delete_primary_channel(capsys): """Try to delete primary channel.""" @@ -215,7 +244,6 @@ def test_deleteChannel_try_to_delete_primary_channel(capsys): assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE) assert err == '' - @pytest.mark.unit def test_deleteChannel_secondary(): """Try to delete a secondary channel."""