Merge pull request #890 from h3lix1/traffic_module

Add traffic management module to the python libraries
This commit is contained in:
Ian McEwen
2026-03-02 10:00:12 -07:00
committed by GitHub
4 changed files with 52 additions and 0 deletions

View File

@@ -1455,6 +1455,10 @@ class MeshInterface: # pylint: disable=R0902
self.localNode.moduleConfig.paxcounter.CopyFrom(
fromRadio.moduleConfig.paxcounter
)
elif fromRadio.moduleConfig.HasField("traffic_management"):
self.localNode.moduleConfig.traffic_management.CopyFrom(
fromRadio.moduleConfig.traffic_management
)
else:
logger.debug("Unexpected FromRadio payload")

View File

@@ -244,6 +244,8 @@ class Node:
p.set_module_config.ambient_lighting.CopyFrom(self.moduleConfig.ambient_lighting)
elif config_name == "paxcounter":
p.set_module_config.paxcounter.CopyFrom(self.moduleConfig.paxcounter)
elif config_name == "traffic_management":
p.set_module_config.traffic_management.CopyFrom(self.moduleConfig.traffic_management)
else:
our_exit(f"Error: No valid config with name {config_name}")

View File

@@ -0,0 +1,22 @@
"""Meshtastic unit tests for traffic management handling in mesh_interface.py."""
import pytest
from ..mesh_interface import MeshInterface
from ..protobuf import mesh_pb2
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_handleFromRadio_with_traffic_management_module_config():
"""Test _handleFromRadio with moduleConfig.traffic_management."""
iface = MeshInterface(noProto=True)
from_radio = mesh_pb2.FromRadio()
from_radio.moduleConfig.traffic_management.enabled = True
from_radio.moduleConfig.traffic_management.rate_limit_enabled = True
iface._handleFromRadio(from_radio.SerializeToString())
assert iface.localNode.moduleConfig.traffic_management.enabled is True
assert iface.localNode.moduleConfig.traffic_management.rate_limit_enabled is True
iface.close()

View File

@@ -794,6 +794,30 @@ def test_writeConfig_with_no_radioConfig(capsys):
assert err == ""
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_writeConfig_traffic_management():
"""Test writeConfig with traffic_management module config."""
iface = MagicMock(autospec=SerialInterface)
anode = Node(iface, 123, noProto=True)
anode.moduleConfig.traffic_management.enabled = True
anode.moduleConfig.traffic_management.rate_limit_enabled = True
sent_admin = []
def capture_send(p, *args, **kwargs): # pylint: disable=W0613
sent_admin.append(p)
with patch.object(anode, "_sendAdmin", side_effect=capture_send):
anode.writeConfig("traffic_management")
assert len(sent_admin) == 1
assert sent_admin[0].HasField("set_module_config")
assert sent_admin[0].set_module_config.HasField("traffic_management")
assert sent_admin[0].set_module_config.traffic_management.enabled is True
assert sent_admin[0].set_module_config.traffic_management.rate_limit_enabled is True
# TODO
# @pytest.mark.unit
# def test_writeConfig(caplog):