From 105276f98ebf2e2028f44a136e78dc347fa6cf86 Mon Sep 17 00:00:00 2001 From: Mike Kinney Date: Fri, 31 Dec 2021 12:17:04 -0800 Subject: [PATCH] add unit tests for _shouldFilterPacket() --- meshtastic/tests/test_tunnel.py | 135 ++++++++++++++++++++++++++++++++ meshtastic/tunnel.py | 6 +- 2 files changed, 139 insertions(+), 2 deletions(-) diff --git a/meshtastic/tests/test_tunnel.py b/meshtastic/tests/test_tunnel.py index d7dc611..6b9411c 100644 --- a/meshtastic/tests/test_tunnel.py +++ b/meshtastic/tests/test_tunnel.py @@ -97,3 +97,138 @@ def test_onTunnelReceive_from_someone_else(mock_platform_system, caplog, reset_g Globals.getInstance().set_tunnelInstance(tun) onTunnelReceive(packet, iface) assert re.search(r'in onTunnelReceive', caplog.text, re.MULTILINE) + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_random(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # random packet + packet = b'1234567890123456789012345678901234567890' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + with caplog.at_level(logging.DEBUG): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert not ignore + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_in_blacklist(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # faked IGMP + packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + with caplog.at_level(logging.DEBUG): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert ignore + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_icmp(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # faked ICMP + packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + with caplog.at_level(logging.DEBUG): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert re.search(r'forwarding ICMP message', caplog.text, re.MULTILINE) + assert not ignore + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_udp(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # faked UDP + packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + with caplog.at_level(logging.DEBUG): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert re.search(r'forwarding udp', caplog.text, re.MULTILINE) + assert not ignore + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_udp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # faked UDP + packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x11\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x07\x6c\x07\x6c\x00\x00\x00' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + # Note: custom logging level + LOG_TRACE = 5 + with caplog.at_level(LOG_TRACE): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert re.search(r'ignoring blacklisted UDP', caplog.text, re.MULTILINE) + assert ignore + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_tcp(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # faked TCP + packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + with caplog.at_level(logging.DEBUG): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert re.search(r'forwarding tcp', caplog.text, re.MULTILINE) + assert not ignore + + +@pytest.mark.unit +@patch('platform.system') +def test_shouldFilterPacket_tcp_blacklisted(mock_platform_system, caplog, reset_globals, iface_with_nodes): + """Test _shouldFilterPacket()""" + iface = iface_with_nodes + iface.noProto = True + # faked TCP + packet = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x17\x0c\x17\x0c\x00\x00\x00' + a_mock = MagicMock() + a_mock.return_value = 'Linux' + mock_platform_system.side_effect = a_mock + # Note: custom logging level + LOG_TRACE = 5 + with caplog.at_level(LOG_TRACE): + with patch('socket.socket'): + tun = Tunnel(iface) + ignore = tun._shouldFilterPacket(packet) + assert re.search(r'ignoring blacklisted TCP', caplog.text, re.MULTILINE) + assert ignore diff --git a/meshtastic/tunnel.py b/meshtastic/tunnel.py index d78c2db..1fff1e1 100644 --- a/meshtastic/tunnel.py +++ b/meshtastic/tunnel.py @@ -66,7 +66,9 @@ class Tunnel: } """A list of TCP services to block""" - self.tcpBlacklist = {} + self.tcpBlacklist = { + 5900, # VNC (Note: Only adding for testing purposes.) + } """A list of protocols we ignore""" self.protocolBlacklist = { @@ -117,7 +119,7 @@ class Tunnel: logging.debug(f"Received mesh tunnel message type={type(p)} len={len(p)}") # we don't really need to check for filtering here (sender should have checked), # but this provides useful debug printing on types of packets received - if not self.iface.noProto: # could move this one line down later + if not self.iface.noProto: if not self._shouldFilterPacket(p): self.tun.write(p)