Module meshtastic.tests.test_node

Meshtastic unit tests for node.py

Expand source code
"""Meshtastic unit tests for node.py"""

import re
import logging

from unittest.mock import patch, MagicMock
import pytest

from ..node import Node
from ..serial_interface import SerialInterface
from ..admin_pb2 import AdminMessage
from ..channel_pb2 import Channel
from ..radioconfig_pb2 import RadioConfig


@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    anode = Node('foo', 'bar')
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig
    anode.showChannels()
    anode.showInfo()
    out, err = capsys.readouterr()
    assert re.search(r'Preferences', out)
    assert re.search(r'Channels', out)
    assert re.search(r'Primary channel URL', out)
    assert err == ''


@pytest.mark.unit
def test_node_reqquestConfig():
    """Test run requestConfig"""
    iface = MagicMock(autospec=SerialInterface)
    amesg = MagicMock(autospec=AdminMessage)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
            anode = Node(mo, 'bar')
            anode.requestConfig()


@pytest.mark.unit
def test_setOwner_and_team(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123', short_name='123', team=1)
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:1', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_no_short_name(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123')
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_is_short(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Tnt')
    assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_has_words(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='A B C', is_licensed=True)
    assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r'in exitSimulator', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r'Telling node to reboot', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_empty_url():
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL('')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1


@pytest.mark.unit
def test_setURL_valid_URL(caplog):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
    with caplog.at_level(logging.DEBUG):
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        channels = ['zoo']
        anode.channels = channels
        anode.setURL(url)
    assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
    assert re.search(r'modem_config: Bw125Cr48Sf4096', caplog.text, re.MULTILINE)
    assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
    assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(caplog):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1


@pytest.mark.unit
def test_showChannels(capsys):
    """Test showChannels"""
    anode = Node('foo', 'bar')

    # primary channel
    # role: 0=Disabled, 1=Primary, 2=Secondary
    # modem_config: 0-5
    # role: 0=Disabled, 1=Primary, 2=Secondary
    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.showChannels()
    out, err = capsys.readouterr()
    assert re.search(r'Channels:', out, re.MULTILINE)
    # primary channel
    assert re.search(r'Primary channel URL', out, re.MULTILINE)
    assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
    assert re.search(r'"modemConfig": "Bw125Cr48Sf4096"', out, re.MULTILINE)
    assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
    # secondary channel
    assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
    assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
    assert err == ''


@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1) # primary channel
    channel2 = Channel(index=2, role=2) # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None


@pytest.mark.unit
def test_deleteChannel_try_to_delete_primary_channel(capsys):
    """Try to delete primary channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    # no secondary channels
    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.deleteChannel(0)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
    assert err == ''

@pytest.mark.unit
def test_deleteChannel_secondary():
    """Try to delete a secondary channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == ''
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''


@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_after_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'admin'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        assert mo.localNode == anode

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == 'admin'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''


@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_before_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'testing'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == 'testing'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(2)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''


@pytest.mark.unit
def test_getChannelByName(capsys):
    """Get a channel by the name."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('admin')
    assert ch.index == 2


@pytest.mark.unit
def test_getChannelByName_invalid_name(capsys):
    """Get a channel by the name but one that is not present."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('testing')
    assert ch is None


@pytest.mark.unit
def test_getDisabledChannel(capsys):
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testingA'

    channel3 = Channel(index=3, role=2)
    channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel3.settings.name = 'testingB'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch.index == 4


@pytest.mark.unit
def test_getDisabledChannel_where_all_channels_are_used(capsys):
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel3 = Channel(index=3, role=2)
    channel4 = Channel(index=4, role=2)
    channel5 = Channel(index=5, role=2)
    channel6 = Channel(index=6, role=2)
    channel7 = Channel(index=7, role=2)
    channel8 = Channel(index=8, role=2)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch is None


@pytest.mark.unit
def test_getAdminChannelIndex(capsys):
    """Get the 'admin' channel index."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 2


@pytest.mark.unit
def test_getAdminChannelIndex_when_no_admin_named_channel(capsys):
    """Get the 'admin' channel when there is not one."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 0


# TODO: should we check if we need to turn it off?
@pytest.mark.unit
def test_turnOffEncryptionOnPrimaryChannel(capsys):
    """Turn off encryption when there is a psk."""
    anode = Node('foo', 'bar', noProto=True)

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
    channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.turnOffEncryptionOnPrimaryChannel()
    out, err = capsys.readouterr()
    assert re.search(r'Writing modified channels to device', out)
    assert err == ''


@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node('foo', 'bar', noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig()
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Error: No RadioConfig has been read', out)
    assert err == ''


@pytest.mark.unit
def test_writeConfig(caplog):
    """Test writeConfig"""
    anode = Node('foo', 'bar', noProto=True)
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig

    with caplog.at_level(logging.DEBUG):
        anode.writeConfig()
    assert re.search(r'Wrote config', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_requestChannel_not_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE)
            assert not re.search(r'from remote node', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
    """Test onResponseRequestChannel()"""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    msg1 = MagicMock(autospec=AdminMessage)
    msg1.get_channel_response = channel1

    msg2 = MagicMock(autospec=AdminMessage)
    channel2 = Channel(index=2, role=0) # disabled
    msg2.get_channel_response = channel2

    # default primary channel
    packet1 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
            'requestId': 2615094405,
            'admin': {
                'getChannelResponse': {
                    'settings': {
                        'modemConfig': 'Bw125Cr48Sf4096',
                        'psk': 'AQ=='
                    },
                    'role': 'PRIMARY'
                },
                'raw': msg1,
            }
        },
        'id': 1692918436,
        'hopLimit': 3,
        'priority':
        'RELIABLE',
        'raw': 'fake',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
        }

    # no other channels
    packet2 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\x04\x08\x02\x12\x00',
            'requestId': 743049663,
            'admin': {
                'getChannelResponse': {
                    'index': 2,
                    'settings': {}
                },
                'raw': msg2,
            }
        },
        'id': 1692918456,
        'rxTime': 1640202239,
        'hopLimit': 3,
        'priority': 'RELIABLE',
        'raw': 'faked',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
    }

    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.requestConfig()
            anode.onResponseRequestChannel(packet1)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            anode.onResponseRequestChannel(packet2)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
            assert len(anode.channels) == 8
            assert anode.channels[0].settings.modem_config == 3
            assert anode.channels[1].settings.name == ''
            assert anode.channels[2].settings.name == ''
            assert anode.channels[3].settings.name == ''
            assert anode.channels[4].settings.name == ''
            assert anode.channels[5].settings.name == ''
            assert anode.channels[6].settings.name == ''
            assert anode.channels[7].settings.name == ''


@pytest.mark.unit
def test_onResponseRequestSetting(caplog):
    """Test onResponseRequestSetting()"""
    # Note: Split out the get_radio_response to a MagicMock
    # so it could be "returned" (not really sure how to do that
    # in a python dict.
    amsg = MagicMock(autospec=AdminMessage)
    amsg.get_radio_response = """{
  preferences {
    phone_timeout_secs: 900
    ls_secs: 300
    position_broadcast_smart: true
    position_flags: 35
  }
}"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
                'raw': amsg
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'raw': 'faked',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.onResponseRequestSettings(packet)
            assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)


@pytest.mark.unit
def test_onResponseRequestSetting_with_error(capsys):
    """Test onResponseRequestSetting() with an error"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'routing': {
                'errorReason': 'some made up error',
            },
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        anode.onResponseRequestSettings(packet)
        out, err = capsys.readouterr()
        assert re.search(r'Error on response', out)
        assert err == ''

Functions

def test_deleteChannel_secondary()

Try to delete a secondary channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_secondary():
    """Try to delete a secondary channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == ''
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''
def test_deleteChannel_secondary_with_admin_channel_after_testing()

Try to delete a secondary channel where there is an admin channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_after_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'admin'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        assert mo.localNode == anode

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'testing'
        assert channels[2].settings.name == 'admin'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(1)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''
def test_deleteChannel_secondary_with_admin_channel_before_testing()

Try to delete a secondary channel where there is an admin channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_secondary_with_admin_channel_before_testing():
    """Try to delete a secondary channel where there is an admin channel."""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=2)
    channel3.settings.name = 'testing'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]


    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        anode.channels = channels
        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == 'testing'
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''

        anode.deleteChannel(2)

        assert len(anode.channels) == 8
        assert channels[0].settings.modem_config == 3
        assert channels[1].settings.name == 'admin'
        assert channels[2].settings.name == ''
        assert channels[3].settings.name == ''
        assert channels[4].settings.name == ''
        assert channels[5].settings.name == ''
        assert channels[6].settings.name == ''
        assert channels[7].settings.name == ''
def test_deleteChannel_try_to_delete_primary_channel(capsys)

Try to delete primary channel.

Expand source code
@pytest.mark.unit
def test_deleteChannel_try_to_delete_primary_channel(capsys):
    """Try to delete primary channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    # no secondary channels
    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.deleteChannel(0)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Warning: Only SECONDARY channels can be deleted', out, re.MULTILINE)
    assert err == ''
def test_exitSimulator(caplog)

Test exitSimulator

Expand source code
@pytest.mark.unit
def test_exitSimulator(caplog):
    """Test exitSimulator"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.exitSimulator()
    assert re.search(r'in exitSimulator', caplog.text, re.MULTILINE)
def test_getAdminChannelIndex(capsys)

Get the 'admin' channel index.

Expand source code
@pytest.mark.unit
def test_getAdminChannelIndex(capsys):
    """Get the 'admin' channel index."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 2
def test_getAdminChannelIndex_when_no_admin_named_channel(capsys)

Get the 'admin' channel when there is not one.

Expand source code
@pytest.mark.unit
def test_getAdminChannelIndex_when_no_admin_named_channel(capsys):
    """Get the 'admin' channel when there is not one."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    i = anode._getAdminChannelIndex()
    assert i == 0
def test_getChannelByChannelIndex()

Test getChannelByChannelIndex()

Expand source code
@pytest.mark.unit
def test_getChannelByChannelIndex():
    """Test getChannelByChannelIndex()"""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1) # primary channel
    channel2 = Channel(index=2, role=2) # secondary channel
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels

    # test primary
    assert anode.getChannelByChannelIndex(0) is not None
    # test secondary
    assert anode.getChannelByChannelIndex(1) is not None
    # test disabled
    assert anode.getChannelByChannelIndex(2) is not None
    # test invalid values
    assert anode.getChannelByChannelIndex(-1) is None
    assert anode.getChannelByChannelIndex(9) is None
def test_getChannelByName(capsys)

Get a channel by the name.

Expand source code
@pytest.mark.unit
def test_getChannelByName(capsys):
    """Get a channel by the name."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('admin')
    assert ch.index == 2
def test_getChannelByName_invalid_name(capsys)

Get a channel by the name but one that is not present.

Expand source code
@pytest.mark.unit
def test_getChannelByName_invalid_name(capsys):
    """Get a channel by the name but one that is not present."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'admin'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getChannelByName('testing')
    assert ch is None
def test_getDisabledChannel(capsys)

Get the first disabled channel.

Expand source code
@pytest.mark.unit
def test_getDisabledChannel(capsys):
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testingA'

    channel3 = Channel(index=3, role=2)
    channel3.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel3.settings.name = 'testingB'

    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch.index == 4
def test_getDisabledChannel_where_all_channels_are_used(capsys)

Get the first disabled channel.

Expand source code
@pytest.mark.unit
def test_getDisabledChannel_where_all_channels_are_used(capsys):
    """Get the first disabled channel."""
    anode = Node('foo', 'bar')

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel3 = Channel(index=3, role=2)
    channel4 = Channel(index=4, role=2)
    channel5 = Channel(index=5, role=2)
    channel6 = Channel(index=6, role=2)
    channel7 = Channel(index=7, role=2)
    channel8 = Channel(index=8, role=2)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    ch = anode.getDisabledChannel()
    assert ch is None
def test_node(capsys)

Test that we can instantiate a Node

Expand source code
@pytest.mark.unit
def test_node(capsys):
    """Test that we can instantiate a Node"""
    anode = Node('foo', 'bar')
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig
    anode.showChannels()
    anode.showInfo()
    out, err = capsys.readouterr()
    assert re.search(r'Preferences', out)
    assert re.search(r'Channels', out)
    assert re.search(r'Primary channel URL', out)
    assert err == ''
def test_node_reqquestConfig()

Test run requestConfig

Expand source code
@pytest.mark.unit
def test_node_reqquestConfig():
    """Test run requestConfig"""
    iface = MagicMock(autospec=SerialInterface)
    amesg = MagicMock(autospec=AdminMessage)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        with patch('meshtastic.admin_pb2.AdminMessage', return_value=amesg):
            anode = Node(mo, 'bar')
            anode.requestConfig()
def test_onResponseRequestChannel(caplog)

Test onResponseRequestChannel()

Expand source code
@pytest.mark.unit
def test_onResponseRequestChannel(caplog):
    """Test onResponseRequestChannel()"""

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    msg1 = MagicMock(autospec=AdminMessage)
    msg1.get_channel_response = channel1

    msg2 = MagicMock(autospec=AdminMessage)
    channel2 = Channel(index=2, role=0) # disabled
    msg2.get_channel_response = channel2

    # default primary channel
    packet1 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\t\x12\x05\x18\x03"\x01\x01\x18\x01',
            'requestId': 2615094405,
            'admin': {
                'getChannelResponse': {
                    'settings': {
                        'modemConfig': 'Bw125Cr48Sf4096',
                        'psk': 'AQ=='
                    },
                    'role': 'PRIMARY'
                },
                'raw': msg1,
            }
        },
        'id': 1692918436,
        'hopLimit': 3,
        'priority':
        'RELIABLE',
        'raw': 'fake',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
        }

    # no other channels
    packet2 = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b':\x04\x08\x02\x12\x00',
            'requestId': 743049663,
            'admin': {
                'getChannelResponse': {
                    'index': 2,
                    'settings': {}
                },
                'raw': msg2,
            }
        },
        'id': 1692918456,
        'rxTime': 1640202239,
        'hopLimit': 3,
        'priority': 'RELIABLE',
        'raw': 'faked',
        'fromId': '!9388f81c',
        'toId': '!9388f81c'
    }

    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.requestConfig()
            anode.onResponseRequestChannel(packet1)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            anode.onResponseRequestChannel(packet2)
            assert re.search(r'Received channel', caplog.text, re.MULTILINE)
            assert re.search(r'Finished downloading channels', caplog.text, re.MULTILINE)
            assert len(anode.channels) == 8
            assert anode.channels[0].settings.modem_config == 3
            assert anode.channels[1].settings.name == ''
            assert anode.channels[2].settings.name == ''
            assert anode.channels[3].settings.name == ''
            assert anode.channels[4].settings.name == ''
            assert anode.channels[5].settings.name == ''
            assert anode.channels[6].settings.name == ''
            assert anode.channels[7].settings.name == ''
def test_onResponseRequestSetting(caplog)

Test onResponseRequestSetting()

Expand source code
@pytest.mark.unit
def test_onResponseRequestSetting(caplog):
    """Test onResponseRequestSetting()"""
    # Note: Split out the get_radio_response to a MagicMock
    # so it could be "returned" (not really sure how to do that
    # in a python dict.
    amsg = MagicMock(autospec=AdminMessage)
    amsg.get_radio_response = """{
  preferences {
    phone_timeout_secs: 900
    ls_secs: 300
    position_broadcast_smart: true
    position_flags: 35
  }
}"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
                'raw': amsg
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'raw': 'faked',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode.onResponseRequestSettings(packet)
            assert re.search(r'Received radio config, now fetching channels..', caplog.text, re.MULTILINE)
def test_onResponseRequestSetting_with_error(capsys)

Test onResponseRequestSetting() with an error

Expand source code
@pytest.mark.unit
def test_onResponseRequestSetting_with_error(capsys):
    """Test onResponseRequestSetting() with an error"""
    packet = {
        'from': 2475227164,
        'to': 2475227164,
        'decoded': {
            'portnum': 'ADMIN_APP',
            'payload': b'*\x0e\n\x0c0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#',
            'requestId': 3145147848,
            'routing': {
                'errorReason': 'some made up error',
            },
            'admin': {
                'getRadioResponse': {
                    'preferences': {
                        'phoneTimeoutSecs': 900,
                        'lsSecs': 300,
                        'positionBroadcastSmart': True,
                        'positionFlags': 35
                     }
                },
            },
            'id': 365963704,
            'rxTime': 1640195197,
            'hopLimit': 3,
            'priority': 'RELIABLE',
            'fromId': '!9388f81c',
            'toId': '!9388f81c'
        }
    }
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        radioConfig = RadioConfig()
        anode.radioConfig = radioConfig

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        anode.onResponseRequestSettings(packet)
        out, err = capsys.readouterr()
        assert re.search(r'Error on response', out)
        assert err == ''
def test_reboot(caplog)

Test reboot

Expand source code
@pytest.mark.unit
def test_reboot(caplog):
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.reboot()
    assert re.search(r'Telling node to reboot', caplog.text, re.MULTILINE)
def test_requestChannel_localNode(caplog)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)

        # Note: Have to do this next line because every call to MagicMock object/method returns a new magic mock
        mo.localNode = anode

        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0', caplog.text, re.MULTILINE)
            assert not re.search(r'from remote node', caplog.text, re.MULTILINE)
def test_requestChannel_not_localNode(caplog)

Test _requestChannel()

Expand source code
@pytest.mark.unit
def test_requestChannel_not_localNode(caplog):
    """Test _requestChannel()"""
    iface = MagicMock(autospec=SerialInterface)
    with patch('meshtastic.serial_interface.SerialInterface', return_value=iface) as mo:
        mo.localNode.getChannelByName.return_value = None
        mo.myInfo.max_channels = 8
        anode = Node(mo, 'bar', noProto=True)
        with caplog.at_level(logging.DEBUG):
            anode._requestChannel(0)
            assert re.search(r'Requesting channel 0 info from remote node', caplog.text, re.MULTILINE)
def test_setOwner_and_team(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_and_team(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123', short_name='123', team=1)
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:1', caplog.text, re.MULTILINE)
def test_setOwner_no_short_name(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_no_short_name(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Test123')
    assert re.search(r'p.set_owner.long_name:Test123:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tst:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
def test_setOwner_no_short_name_and_long_name_has_words(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_has_words(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='A B C', is_licensed=True)
    assert re.search(r'p.set_owner.long_name:A B C:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:ABC:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:True', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
def test_setOwner_no_short_name_and_long_name_is_short(caplog)

Test setOwner

Expand source code
@pytest.mark.unit
def test_setOwner_no_short_name_and_long_name_is_short(caplog):
    """Test setOwner"""
    anode = Node('foo', 'bar', noProto=True)
    with caplog.at_level(logging.DEBUG):
        anode.setOwner(long_name ='Tnt')
    assert re.search(r'p.set_owner.long_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.short_name:Tnt:', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.is_licensed:False', caplog.text, re.MULTILINE)
    assert re.search(r'p.set_owner.team:0', caplog.text, re.MULTILINE)
def test_setURL_empty_url()

Test reboot

Expand source code
@pytest.mark.unit
def test_setURL_empty_url():
    """Test reboot"""
    anode = Node('foo', 'bar', noProto=True)
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.setURL('')
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
def test_setURL_valid_URL(caplog)

Test setURL

Expand source code
@pytest.mark.unit
def test_setURL_valid_URL(caplog):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#CgUYAyIBAQ"
    with caplog.at_level(logging.DEBUG):
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        channels = ['zoo']
        anode.channels = channels
        anode.setURL(url)
    assert re.search(r'Channel i:0', caplog.text, re.MULTILINE)
    assert re.search(r'modem_config: Bw125Cr48Sf4096', caplog.text, re.MULTILINE)
    assert re.search(r'psk: "\\001"', caplog.text, re.MULTILINE)
    assert re.search(r'role: PRIMARY', caplog.text, re.MULTILINE)
def test_setURL_valid_URL_but_no_settings(caplog)

Test setURL

Expand source code
@pytest.mark.unit
def test_setURL_valid_URL_but_no_settings(caplog):
    """Test setURL"""
    iface = MagicMock(autospec=SerialInterface)
    url = "https://www.meshtastic.org/d/#"
    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode = Node(iface, 'bar', noProto=True)
        anode.radioConfig = 'baz'
        anode.setURL(url)
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
def test_showChannels(capsys)

Test showChannels

Expand source code
@pytest.mark.unit
def test_showChannels(capsys):
    """Test showChannels"""
    anode = Node('foo', 'bar')

    # primary channel
    # role: 0=Disabled, 1=Primary, 2=Secondary
    # modem_config: 0-5
    # role: 0=Disabled, 1=Primary, 2=Secondary
    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    channel1.settings.psk = b'\x01'

    channel2 = Channel(index=2, role=2)
    channel2.settings.psk = b'\x8a\x94y\x0e\xc6\xc9\x1e5\x91\x12@\xa60\xa8\xb43\x87\x00\xf2K\x0e\xe7\x7fAz\xcd\xf5\xb0\x900\xa84'
    channel2.settings.name = 'testing'

    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.showChannels()
    out, err = capsys.readouterr()
    assert re.search(r'Channels:', out, re.MULTILINE)
    # primary channel
    assert re.search(r'Primary channel URL', out, re.MULTILINE)
    assert re.search(r'PRIMARY psk=default ', out, re.MULTILINE)
    assert re.search(r'"modemConfig": "Bw125Cr48Sf4096"', out, re.MULTILINE)
    assert re.search(r'"psk": "AQ=="', out, re.MULTILINE)
    # secondary channel
    assert re.search(r'SECONDARY psk=secret ', out, re.MULTILINE)
    assert re.search(r'"psk": "ipR5DsbJHjWREkCmMKi0M4cA8ksO539Bes31sJAwqDQ="', out, re.MULTILINE)
    assert err == ''
def test_turnOffEncryptionOnPrimaryChannel(capsys)

Turn off encryption when there is a psk.

Expand source code
@pytest.mark.unit
def test_turnOffEncryptionOnPrimaryChannel(capsys):
    """Turn off encryption when there is a psk."""
    anode = Node('foo', 'bar', noProto=True)

    channel1 = Channel(index=1, role=1)
    channel1.settings.modem_config = 3
    # value from using "--ch-set psk 0x1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b1a1a1a1a2b2b2b2b "
    channel1.settings.psk = b'\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++\x1a\x1a\x1a\x1a++++'

    channel2 = Channel(index=2, role=0)
    channel3 = Channel(index=3, role=0)
    channel4 = Channel(index=4, role=0)
    channel5 = Channel(index=5, role=0)
    channel6 = Channel(index=6, role=0)
    channel7 = Channel(index=7, role=0)
    channel8 = Channel(index=8, role=0)

    channels = [ channel1, channel2, channel3, channel4, channel5, channel6, channel7, channel8 ]

    anode.channels = channels
    anode.turnOffEncryptionOnPrimaryChannel()
    out, err = capsys.readouterr()
    assert re.search(r'Writing modified channels to device', out)
    assert err == ''
def test_writeConfig(caplog)

Test writeConfig

Expand source code
@pytest.mark.unit
def test_writeConfig(caplog):
    """Test writeConfig"""
    anode = Node('foo', 'bar', noProto=True)
    radioConfig = RadioConfig()
    anode.radioConfig = radioConfig

    with caplog.at_level(logging.DEBUG):
        anode.writeConfig()
    assert re.search(r'Wrote config', caplog.text, re.MULTILINE)
def test_writeConfig_with_no_radioConfig(capsys)

Test writeConfig with no radioConfig.

Expand source code
@pytest.mark.unit
def test_writeConfig_with_no_radioConfig(capsys):
    """Test writeConfig with no radioConfig."""
    anode = Node('foo', 'bar', noProto=True)

    with pytest.raises(SystemExit) as pytest_wrapped_e:
        anode.writeConfig()
    assert pytest_wrapped_e.type == SystemExit
    assert pytest_wrapped_e.value.code == 1
    out, err = capsys.readouterr()
    assert re.search(r'Error: No RadioConfig has been read', out)
    assert err == ''

Classes

class AdminMessage (**kwargs)

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

Ancestors

  • google.protobuf.message.Message

Class variables

var CONFIRM_SET_CHANNEL_FIELD_NUMBER
var CONFIRM_SET_RADIO_FIELD_NUMBER
var DESCRIPTOR
var EXIT_SIMULATOR_FIELD_NUMBER
var GET_CHANNEL_REQUEST_FIELD_NUMBER
var GET_CHANNEL_RESPONSE_FIELD_NUMBER
var GET_OWNER_REQUEST_FIELD_NUMBER
var GET_OWNER_RESPONSE_FIELD_NUMBER
var GET_RADIO_REQUEST_FIELD_NUMBER
var GET_RADIO_RESPONSE_FIELD_NUMBER
var REBOOT_SECONDS_FIELD_NUMBER
var SET_CHANNEL_FIELD_NUMBER
var SET_OWNER_FIELD_NUMBER
var SET_RADIO_FIELD_NUMBER

Static methods

def FromString(s)
Expand source code
def FromString(s):
  message = cls()
  message.MergeFromString(s)
  return message
def RegisterExtension(extension_handle)
Expand source code
def RegisterExtension(extension_handle):
  extension_handle.containing_type = cls.DESCRIPTOR
  # TODO(amauryfa): Use cls.MESSAGE_FACTORY.pool when available.
  # pylint: disable=protected-access
  cls.DESCRIPTOR.file.pool._AddExtensionDescriptor(extension_handle)
  _AttachFieldHelpers(cls, extension_handle)

Instance variables

var confirm_set_channel

Getter for confirm_set_channel.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var confirm_set_radio

Getter for confirm_set_radio.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var exit_simulator

Getter for exit_simulator.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var get_channel_request

Getter for get_channel_request.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var get_channel_response

Getter for get_channel_response.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value
var get_owner_request

Getter for get_owner_request.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var get_owner_response

Getter for get_owner_response.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value
var get_radio_request

Getter for get_radio_request.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var get_radio_response

Getter for get_radio_response.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value
var reboot_seconds

Getter for reboot_seconds.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var set_channel

Getter for set_channel.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value
var set_owner

Getter for set_owner.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value
var set_radio

Getter for set_radio.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value

Methods

def ByteSize(self)
Expand source code
def ByteSize(self):
  if not self._cached_byte_size_dirty:
    return self._cached_byte_size

  size = 0
  descriptor = self.DESCRIPTOR
  if descriptor.GetOptions().map_entry:
    # Fields of map entry should always be serialized.
    size = descriptor.fields_by_name['key']._sizer(self.key)
    size += descriptor.fields_by_name['value']._sizer(self.value)
  else:
    for field_descriptor, field_value in self.ListFields():
      size += field_descriptor._sizer(field_value)
    for tag_bytes, value_bytes in self._unknown_fields:
      size += len(tag_bytes) + len(value_bytes)

  self._cached_byte_size = size
  self._cached_byte_size_dirty = False
  self._listener_for_children.dirty = False
  return size
def Clear(self)
Expand source code
def _Clear(self):
  # Clear fields.
  self._fields = {}
  self._unknown_fields = ()
  # pylint: disable=protected-access
  if self._unknown_field_set is not None:
    self._unknown_field_set._clear()
    self._unknown_field_set = None

  self._oneofs = {}
  self._Modified()
def ClearField(self, field_name)
Expand source code
def ClearField(self, field_name):
  try:
    field = message_descriptor.fields_by_name[field_name]
  except KeyError:
    try:
      field = message_descriptor.oneofs_by_name[field_name]
      if field in self._oneofs:
        field = self._oneofs[field]
      else:
        return
    except KeyError:
      raise ValueError('Protocol message %s has no "%s" field.' %
                       (message_descriptor.name, field_name))

  if field in self._fields:
    # To match the C++ implementation, we need to invalidate iterators
    # for map fields when ClearField() happens.
    if hasattr(self._fields[field], 'InvalidateIterators'):
      self._fields[field].InvalidateIterators()

    # Note:  If the field is a sub-message, its listener will still point
    #   at us.  That's fine, because the worst than can happen is that it
    #   will call _Modified() and invalidate our byte size.  Big deal.
    del self._fields[field]

    if self._oneofs.get(field.containing_oneof, None) is field:
      del self._oneofs[field.containing_oneof]

  # Always call _Modified() -- even if nothing was changed, this is
  # a mutating method, and thus calling it should cause the field to become
  # present in the parent message.
  self._Modified()
def DiscardUnknownFields(self)
Expand source code
def _DiscardUnknownFields(self):
  self._unknown_fields = []
  self._unknown_field_set = None      # pylint: disable=protected-access
  for field, value in self.ListFields():
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if _IsMapField(field):
        if _IsMessageMapField(field):
          for key in value:
            value[key].DiscardUnknownFields()
      elif field.label == _FieldDescriptor.LABEL_REPEATED:
        for sub_message in value:
          sub_message.DiscardUnknownFields()
      else:
        value.DiscardUnknownFields()
def FindInitializationErrors(self)

Finds required fields which are not initialized.

Returns

A list of strings. Each string is a path to an uninitialized field from the top-level message, e.g. "foo.bar[5].baz".

Expand source code
def FindInitializationErrors(self):
  """Finds required fields which are not initialized.

  Returns:
    A list of strings.  Each string is a path to an uninitialized field from
    the top-level message, e.g. "foo.bar[5].baz".
  """

  errors = []  # simplify things

  for field in required_fields:
    if not self.HasField(field.name):
      errors.append(field.name)

  for field, value in self.ListFields():
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if field.is_extension:
        name = '(%s)' % field.full_name
      else:
        name = field.name

      if _IsMapField(field):
        if _IsMessageMapField(field):
          for key in value:
            element = value[key]
            prefix = '%s[%s].' % (name, key)
            sub_errors = element.FindInitializationErrors()
            errors += [prefix + error for error in sub_errors]
        else:
          # ScalarMaps can't have any initialization errors.
          pass
      elif field.label == _FieldDescriptor.LABEL_REPEATED:
        for i in range(len(value)):
          element = value[i]
          prefix = '%s[%d].' % (name, i)
          sub_errors = element.FindInitializationErrors()
          errors += [prefix + error for error in sub_errors]
      else:
        prefix = name + '.'
        sub_errors = value.FindInitializationErrors()
        errors += [prefix + error for error in sub_errors]

  return errors
def HasField(self, field_name)
Expand source code
def HasField(self, field_name):
  try:
    field = hassable_fields[field_name]
  except KeyError:
    raise ValueError(error_msg % (message_descriptor.full_name, field_name))

  if isinstance(field, descriptor_mod.OneofDescriptor):
    try:
      return HasField(self, self._oneofs[field].name)
    except KeyError:
      return False
  else:
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      value = self._fields.get(field)
      return value is not None and value._is_present_in_parent
    else:
      return field in self._fields
def IsInitialized(self, errors=None)

Checks if all required fields of a message are set.

Args

errors
A list which, if provided, will be populated with the field paths of all missing required fields.

Returns

True iff the specified message has all required fields set.

Expand source code
def IsInitialized(self, errors=None):
  """Checks if all required fields of a message are set.

  Args:
    errors:  A list which, if provided, will be populated with the field
             paths of all missing required fields.

  Returns:
    True iff the specified message has all required fields set.
  """

  # Performance is critical so we avoid HasField() and ListFields().

  for field in required_fields:
    if (field not in self._fields or
        (field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE and
         not self._fields[field]._is_present_in_parent)):
      if errors is not None:
        errors.extend(self.FindInitializationErrors())
      return False

  for field, value in list(self._fields.items()):  # dict can change size!
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if field.label == _FieldDescriptor.LABEL_REPEATED:
        if (field.message_type.has_options and
            field.message_type.GetOptions().map_entry):
          continue
        for element in value:
          if not element.IsInitialized():
            if errors is not None:
              errors.extend(self.FindInitializationErrors())
            return False
      elif value._is_present_in_parent and not value.IsInitialized():
        if errors is not None:
          errors.extend(self.FindInitializationErrors())
        return False

  return True
def ListFields(self)
Expand source code
def ListFields(self):
  all_fields = [item for item in self._fields.items() if _IsPresent(item)]
  all_fields.sort(key = lambda item: item[0].number)
  return all_fields
def MergeFrom(self, msg)
Expand source code
def MergeFrom(self, msg):
  if not isinstance(msg, cls):
    raise TypeError(
        'Parameter to MergeFrom() must be instance of same class: '
        'expected %s got %s.' % (_FullyQualifiedClassName(cls),
                                 _FullyQualifiedClassName(msg.__class__)))

  assert msg is not self
  self._Modified()

  fields = self._fields

  for field, value in msg._fields.items():
    if field.label == LABEL_REPEATED:
      field_value = fields.get(field)
      if field_value is None:
        # Construct a new object to represent this field.
        field_value = field._default_constructor(self)
        fields[field] = field_value
      field_value.MergeFrom(value)
    elif field.cpp_type == CPPTYPE_MESSAGE:
      if value._is_present_in_parent:
        field_value = fields.get(field)
        if field_value is None:
          # Construct a new object to represent this field.
          field_value = field._default_constructor(self)
          fields[field] = field_value
        field_value.MergeFrom(value)
    else:
      self._fields[field] = value
      if field.containing_oneof:
        self._UpdateOneofState(field)

  if msg._unknown_fields:
    if not self._unknown_fields:
      self._unknown_fields = []
    self._unknown_fields.extend(msg._unknown_fields)
    # pylint: disable=protected-access
    if self._unknown_field_set is None:
      self._unknown_field_set = containers.UnknownFieldSet()
    self._unknown_field_set._extend(msg._unknown_field_set)
def MergeFromString(self, serialized)
Expand source code
def MergeFromString(self, serialized):
  serialized = memoryview(serialized)
  length = len(serialized)
  try:
    if self._InternalParse(serialized, 0, length) != length:
      # The only reason _InternalParse would return early is if it
      # encountered an end-group tag.
      raise message_mod.DecodeError('Unexpected end-group tag.')
  except (IndexError, TypeError):
    # Now ord(buf[p:p+1]) == ord('') gets TypeError.
    raise message_mod.DecodeError('Truncated message.')
  except struct.error as e:
    raise message_mod.DecodeError(e)
  return length   # Return this for legacy reasons.
def SerializePartialToString(self, **kwargs)
Expand source code
def SerializePartialToString(self, **kwargs):
  out = BytesIO()
  self._InternalSerialize(out.write, **kwargs)
  return out.getvalue()
def SerializeToString(self, **kwargs)
Expand source code
def SerializeToString(self, **kwargs):
  # Check if the message has all of its required fields set.
  if not self.IsInitialized():
    raise message_mod.EncodeError(
        'Message %s is missing required fields: %s' % (
        self.DESCRIPTOR.full_name, ','.join(self.FindInitializationErrors())))
  return self.SerializePartialToString(**kwargs)
def SetInParent(self)

Sets the _cached_byte_size_dirty bit to true, and propagates this to our listener iff this was a state change.

Expand source code
def Modified(self):
  """Sets the _cached_byte_size_dirty bit to true,
  and propagates this to our listener iff this was a state change.
  """

  # Note:  Some callers check _cached_byte_size_dirty before calling
  #   _Modified() as an extra optimization.  So, if this method is ever
  #   changed such that it does stuff even when _cached_byte_size_dirty is
  #   already true, the callers need to be updated.
  if not self._cached_byte_size_dirty:
    self._cached_byte_size_dirty = True
    self._listener_for_children.dirty = True
    self._is_present_in_parent = True
    self._listener.Modified()
def UnknownFields(self)
Expand source code
def _UnknownFields(self):
  if self._unknown_field_set is None:  # pylint: disable=protected-access
    # pylint: disable=protected-access
    self._unknown_field_set = containers.UnknownFieldSet()
  return self._unknown_field_set    # pylint: disable=protected-access
def WhichOneof(self, oneof_name)

Returns the name of the currently set field inside a oneof, or None.

Expand source code
def WhichOneof(self, oneof_name):
  """Returns the name of the currently set field inside a oneof, or None."""
  try:
    field = message_descriptor.oneofs_by_name[oneof_name]
  except KeyError:
    raise ValueError(
        'Protocol message has no oneof "%s" field.' % oneof_name)

  nested_field = self._oneofs.get(field, None)
  if nested_field is not None and self.HasField(nested_field.name):
    return nested_field.name
  else:
    return None
class Channel (**kwargs)

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

Ancestors

  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
var DISABLED
var INDEX_FIELD_NUMBER
var PRIMARY
var ROLE_FIELD_NUMBER
var Role
var SECONDARY
var SETTINGS_FIELD_NUMBER

Static methods

def FromString(s)
Expand source code
def FromString(s):
  message = cls()
  message.MergeFromString(s)
  return message
def RegisterExtension(extension_handle)
Expand source code
def RegisterExtension(extension_handle):
  extension_handle.containing_type = cls.DESCRIPTOR
  # TODO(amauryfa): Use cls.MESSAGE_FACTORY.pool when available.
  # pylint: disable=protected-access
  cls.DESCRIPTOR.file.pool._AddExtensionDescriptor(extension_handle)
  _AttachFieldHelpers(cls, extension_handle)

Instance variables

var index

Getter for index.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var role

Getter for role.

Expand source code
def getter(self):
  # TODO(protobuf-team): This may be broken since there may not be
  # default_value.  Combine with has_default_value somehow.
  return self._fields.get(field, default_value)
var settings

Getter for settings.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value

Methods

def ByteSize(self)
Expand source code
def ByteSize(self):
  if not self._cached_byte_size_dirty:
    return self._cached_byte_size

  size = 0
  descriptor = self.DESCRIPTOR
  if descriptor.GetOptions().map_entry:
    # Fields of map entry should always be serialized.
    size = descriptor.fields_by_name['key']._sizer(self.key)
    size += descriptor.fields_by_name['value']._sizer(self.value)
  else:
    for field_descriptor, field_value in self.ListFields():
      size += field_descriptor._sizer(field_value)
    for tag_bytes, value_bytes in self._unknown_fields:
      size += len(tag_bytes) + len(value_bytes)

  self._cached_byte_size = size
  self._cached_byte_size_dirty = False
  self._listener_for_children.dirty = False
  return size
def Clear(self)
Expand source code
def _Clear(self):
  # Clear fields.
  self._fields = {}
  self._unknown_fields = ()
  # pylint: disable=protected-access
  if self._unknown_field_set is not None:
    self._unknown_field_set._clear()
    self._unknown_field_set = None

  self._oneofs = {}
  self._Modified()
def ClearField(self, field_name)
Expand source code
def ClearField(self, field_name):
  try:
    field = message_descriptor.fields_by_name[field_name]
  except KeyError:
    try:
      field = message_descriptor.oneofs_by_name[field_name]
      if field in self._oneofs:
        field = self._oneofs[field]
      else:
        return
    except KeyError:
      raise ValueError('Protocol message %s has no "%s" field.' %
                       (message_descriptor.name, field_name))

  if field in self._fields:
    # To match the C++ implementation, we need to invalidate iterators
    # for map fields when ClearField() happens.
    if hasattr(self._fields[field], 'InvalidateIterators'):
      self._fields[field].InvalidateIterators()

    # Note:  If the field is a sub-message, its listener will still point
    #   at us.  That's fine, because the worst than can happen is that it
    #   will call _Modified() and invalidate our byte size.  Big deal.
    del self._fields[field]

    if self._oneofs.get(field.containing_oneof, None) is field:
      del self._oneofs[field.containing_oneof]

  # Always call _Modified() -- even if nothing was changed, this is
  # a mutating method, and thus calling it should cause the field to become
  # present in the parent message.
  self._Modified()
def DiscardUnknownFields(self)
Expand source code
def _DiscardUnknownFields(self):
  self._unknown_fields = []
  self._unknown_field_set = None      # pylint: disable=protected-access
  for field, value in self.ListFields():
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if _IsMapField(field):
        if _IsMessageMapField(field):
          for key in value:
            value[key].DiscardUnknownFields()
      elif field.label == _FieldDescriptor.LABEL_REPEATED:
        for sub_message in value:
          sub_message.DiscardUnknownFields()
      else:
        value.DiscardUnknownFields()
def FindInitializationErrors(self)

Finds required fields which are not initialized.

Returns

A list of strings. Each string is a path to an uninitialized field from the top-level message, e.g. "foo.bar[5].baz".

Expand source code
def FindInitializationErrors(self):
  """Finds required fields which are not initialized.

  Returns:
    A list of strings.  Each string is a path to an uninitialized field from
    the top-level message, e.g. "foo.bar[5].baz".
  """

  errors = []  # simplify things

  for field in required_fields:
    if not self.HasField(field.name):
      errors.append(field.name)

  for field, value in self.ListFields():
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if field.is_extension:
        name = '(%s)' % field.full_name
      else:
        name = field.name

      if _IsMapField(field):
        if _IsMessageMapField(field):
          for key in value:
            element = value[key]
            prefix = '%s[%s].' % (name, key)
            sub_errors = element.FindInitializationErrors()
            errors += [prefix + error for error in sub_errors]
        else:
          # ScalarMaps can't have any initialization errors.
          pass
      elif field.label == _FieldDescriptor.LABEL_REPEATED:
        for i in range(len(value)):
          element = value[i]
          prefix = '%s[%d].' % (name, i)
          sub_errors = element.FindInitializationErrors()
          errors += [prefix + error for error in sub_errors]
      else:
        prefix = name + '.'
        sub_errors = value.FindInitializationErrors()
        errors += [prefix + error for error in sub_errors]

  return errors
def HasField(self, field_name)
Expand source code
def HasField(self, field_name):
  try:
    field = hassable_fields[field_name]
  except KeyError:
    raise ValueError(error_msg % (message_descriptor.full_name, field_name))

  if isinstance(field, descriptor_mod.OneofDescriptor):
    try:
      return HasField(self, self._oneofs[field].name)
    except KeyError:
      return False
  else:
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      value = self._fields.get(field)
      return value is not None and value._is_present_in_parent
    else:
      return field in self._fields
def IsInitialized(self, errors=None)

Checks if all required fields of a message are set.

Args

errors
A list which, if provided, will be populated with the field paths of all missing required fields.

Returns

True iff the specified message has all required fields set.

Expand source code
def IsInitialized(self, errors=None):
  """Checks if all required fields of a message are set.

  Args:
    errors:  A list which, if provided, will be populated with the field
             paths of all missing required fields.

  Returns:
    True iff the specified message has all required fields set.
  """

  # Performance is critical so we avoid HasField() and ListFields().

  for field in required_fields:
    if (field not in self._fields or
        (field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE and
         not self._fields[field]._is_present_in_parent)):
      if errors is not None:
        errors.extend(self.FindInitializationErrors())
      return False

  for field, value in list(self._fields.items()):  # dict can change size!
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if field.label == _FieldDescriptor.LABEL_REPEATED:
        if (field.message_type.has_options and
            field.message_type.GetOptions().map_entry):
          continue
        for element in value:
          if not element.IsInitialized():
            if errors is not None:
              errors.extend(self.FindInitializationErrors())
            return False
      elif value._is_present_in_parent and not value.IsInitialized():
        if errors is not None:
          errors.extend(self.FindInitializationErrors())
        return False

  return True
def ListFields(self)
Expand source code
def ListFields(self):
  all_fields = [item for item in self._fields.items() if _IsPresent(item)]
  all_fields.sort(key = lambda item: item[0].number)
  return all_fields
def MergeFrom(self, msg)
Expand source code
def MergeFrom(self, msg):
  if not isinstance(msg, cls):
    raise TypeError(
        'Parameter to MergeFrom() must be instance of same class: '
        'expected %s got %s.' % (_FullyQualifiedClassName(cls),
                                 _FullyQualifiedClassName(msg.__class__)))

  assert msg is not self
  self._Modified()

  fields = self._fields

  for field, value in msg._fields.items():
    if field.label == LABEL_REPEATED:
      field_value = fields.get(field)
      if field_value is None:
        # Construct a new object to represent this field.
        field_value = field._default_constructor(self)
        fields[field] = field_value
      field_value.MergeFrom(value)
    elif field.cpp_type == CPPTYPE_MESSAGE:
      if value._is_present_in_parent:
        field_value = fields.get(field)
        if field_value is None:
          # Construct a new object to represent this field.
          field_value = field._default_constructor(self)
          fields[field] = field_value
        field_value.MergeFrom(value)
    else:
      self._fields[field] = value
      if field.containing_oneof:
        self._UpdateOneofState(field)

  if msg._unknown_fields:
    if not self._unknown_fields:
      self._unknown_fields = []
    self._unknown_fields.extend(msg._unknown_fields)
    # pylint: disable=protected-access
    if self._unknown_field_set is None:
      self._unknown_field_set = containers.UnknownFieldSet()
    self._unknown_field_set._extend(msg._unknown_field_set)
def MergeFromString(self, serialized)
Expand source code
def MergeFromString(self, serialized):
  serialized = memoryview(serialized)
  length = len(serialized)
  try:
    if self._InternalParse(serialized, 0, length) != length:
      # The only reason _InternalParse would return early is if it
      # encountered an end-group tag.
      raise message_mod.DecodeError('Unexpected end-group tag.')
  except (IndexError, TypeError):
    # Now ord(buf[p:p+1]) == ord('') gets TypeError.
    raise message_mod.DecodeError('Truncated message.')
  except struct.error as e:
    raise message_mod.DecodeError(e)
  return length   # Return this for legacy reasons.
def SerializePartialToString(self, **kwargs)
Expand source code
def SerializePartialToString(self, **kwargs):
  out = BytesIO()
  self._InternalSerialize(out.write, **kwargs)
  return out.getvalue()
def SerializeToString(self, **kwargs)
Expand source code
def SerializeToString(self, **kwargs):
  # Check if the message has all of its required fields set.
  if not self.IsInitialized():
    raise message_mod.EncodeError(
        'Message %s is missing required fields: %s' % (
        self.DESCRIPTOR.full_name, ','.join(self.FindInitializationErrors())))
  return self.SerializePartialToString(**kwargs)
def SetInParent(self)

Sets the _cached_byte_size_dirty bit to true, and propagates this to our listener iff this was a state change.

Expand source code
def Modified(self):
  """Sets the _cached_byte_size_dirty bit to true,
  and propagates this to our listener iff this was a state change.
  """

  # Note:  Some callers check _cached_byte_size_dirty before calling
  #   _Modified() as an extra optimization.  So, if this method is ever
  #   changed such that it does stuff even when _cached_byte_size_dirty is
  #   already true, the callers need to be updated.
  if not self._cached_byte_size_dirty:
    self._cached_byte_size_dirty = True
    self._listener_for_children.dirty = True
    self._is_present_in_parent = True
    self._listener.Modified()
def UnknownFields(self)
Expand source code
def _UnknownFields(self):
  if self._unknown_field_set is None:  # pylint: disable=protected-access
    # pylint: disable=protected-access
    self._unknown_field_set = containers.UnknownFieldSet()
  return self._unknown_field_set    # pylint: disable=protected-access
def WhichOneof(self, oneof_name)

Returns the name of the currently set field inside a oneof, or None.

Expand source code
def WhichOneof(self, oneof_name):
  """Returns the name of the currently set field inside a oneof, or None."""
  try:
    field = message_descriptor.oneofs_by_name[oneof_name]
  except KeyError:
    raise ValueError(
        'Protocol message has no oneof "%s" field.' % oneof_name)

  nested_field = self._oneofs.get(field, None)
  if nested_field is not None and self.HasField(nested_field.name):
    return nested_field.name
  else:
    return None
class RadioConfig (**kwargs)

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

Ancestors

  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
var PREFERENCES_FIELD_NUMBER
var UserPreferences

Abstract base class for protocol messages.

Protocol message classes are almost always generated by the protocol compiler. These generated types subclass Message and implement the methods shown below.

Static methods

def FromString(s)
Expand source code
def FromString(s):
  message = cls()
  message.MergeFromString(s)
  return message
def RegisterExtension(extension_handle)
Expand source code
def RegisterExtension(extension_handle):
  extension_handle.containing_type = cls.DESCRIPTOR
  # TODO(amauryfa): Use cls.MESSAGE_FACTORY.pool when available.
  # pylint: disable=protected-access
  cls.DESCRIPTOR.file.pool._AddExtensionDescriptor(extension_handle)
  _AttachFieldHelpers(cls, extension_handle)

Instance variables

var preferences

Getter for preferences.

Expand source code
def getter(self):
  field_value = self._fields.get(field)
  if field_value is None:
    # Construct a new object to represent this field.
    field_value = field._default_constructor(self)

    # Atomically check if another thread has preempted us and, if not, swap
    # in the new object we just created.  If someone has preempted us, we
    # take that object and discard ours.
    # WARNING:  We are relying on setdefault() being atomic.  This is true
    #   in CPython but we haven't investigated others.  This warning appears
    #   in several other locations in this file.
    field_value = self._fields.setdefault(field, field_value)
  return field_value

Methods

def ByteSize(self)
Expand source code
def ByteSize(self):
  if not self._cached_byte_size_dirty:
    return self._cached_byte_size

  size = 0
  descriptor = self.DESCRIPTOR
  if descriptor.GetOptions().map_entry:
    # Fields of map entry should always be serialized.
    size = descriptor.fields_by_name['key']._sizer(self.key)
    size += descriptor.fields_by_name['value']._sizer(self.value)
  else:
    for field_descriptor, field_value in self.ListFields():
      size += field_descriptor._sizer(field_value)
    for tag_bytes, value_bytes in self._unknown_fields:
      size += len(tag_bytes) + len(value_bytes)

  self._cached_byte_size = size
  self._cached_byte_size_dirty = False
  self._listener_for_children.dirty = False
  return size
def Clear(self)
Expand source code
def _Clear(self):
  # Clear fields.
  self._fields = {}
  self._unknown_fields = ()
  # pylint: disable=protected-access
  if self._unknown_field_set is not None:
    self._unknown_field_set._clear()
    self._unknown_field_set = None

  self._oneofs = {}
  self._Modified()
def ClearField(self, field_name)
Expand source code
def ClearField(self, field_name):
  try:
    field = message_descriptor.fields_by_name[field_name]
  except KeyError:
    try:
      field = message_descriptor.oneofs_by_name[field_name]
      if field in self._oneofs:
        field = self._oneofs[field]
      else:
        return
    except KeyError:
      raise ValueError('Protocol message %s has no "%s" field.' %
                       (message_descriptor.name, field_name))

  if field in self._fields:
    # To match the C++ implementation, we need to invalidate iterators
    # for map fields when ClearField() happens.
    if hasattr(self._fields[field], 'InvalidateIterators'):
      self._fields[field].InvalidateIterators()

    # Note:  If the field is a sub-message, its listener will still point
    #   at us.  That's fine, because the worst than can happen is that it
    #   will call _Modified() and invalidate our byte size.  Big deal.
    del self._fields[field]

    if self._oneofs.get(field.containing_oneof, None) is field:
      del self._oneofs[field.containing_oneof]

  # Always call _Modified() -- even if nothing was changed, this is
  # a mutating method, and thus calling it should cause the field to become
  # present in the parent message.
  self._Modified()
def DiscardUnknownFields(self)
Expand source code
def _DiscardUnknownFields(self):
  self._unknown_fields = []
  self._unknown_field_set = None      # pylint: disable=protected-access
  for field, value in self.ListFields():
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if _IsMapField(field):
        if _IsMessageMapField(field):
          for key in value:
            value[key].DiscardUnknownFields()
      elif field.label == _FieldDescriptor.LABEL_REPEATED:
        for sub_message in value:
          sub_message.DiscardUnknownFields()
      else:
        value.DiscardUnknownFields()
def FindInitializationErrors(self)

Finds required fields which are not initialized.

Returns

A list of strings. Each string is a path to an uninitialized field from the top-level message, e.g. "foo.bar[5].baz".

Expand source code
def FindInitializationErrors(self):
  """Finds required fields which are not initialized.

  Returns:
    A list of strings.  Each string is a path to an uninitialized field from
    the top-level message, e.g. "foo.bar[5].baz".
  """

  errors = []  # simplify things

  for field in required_fields:
    if not self.HasField(field.name):
      errors.append(field.name)

  for field, value in self.ListFields():
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if field.is_extension:
        name = '(%s)' % field.full_name
      else:
        name = field.name

      if _IsMapField(field):
        if _IsMessageMapField(field):
          for key in value:
            element = value[key]
            prefix = '%s[%s].' % (name, key)
            sub_errors = element.FindInitializationErrors()
            errors += [prefix + error for error in sub_errors]
        else:
          # ScalarMaps can't have any initialization errors.
          pass
      elif field.label == _FieldDescriptor.LABEL_REPEATED:
        for i in range(len(value)):
          element = value[i]
          prefix = '%s[%d].' % (name, i)
          sub_errors = element.FindInitializationErrors()
          errors += [prefix + error for error in sub_errors]
      else:
        prefix = name + '.'
        sub_errors = value.FindInitializationErrors()
        errors += [prefix + error for error in sub_errors]

  return errors
def HasField(self, field_name)
Expand source code
def HasField(self, field_name):
  try:
    field = hassable_fields[field_name]
  except KeyError:
    raise ValueError(error_msg % (message_descriptor.full_name, field_name))

  if isinstance(field, descriptor_mod.OneofDescriptor):
    try:
      return HasField(self, self._oneofs[field].name)
    except KeyError:
      return False
  else:
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      value = self._fields.get(field)
      return value is not None and value._is_present_in_parent
    else:
      return field in self._fields
def IsInitialized(self, errors=None)

Checks if all required fields of a message are set.

Args

errors
A list which, if provided, will be populated with the field paths of all missing required fields.

Returns

True iff the specified message has all required fields set.

Expand source code
def IsInitialized(self, errors=None):
  """Checks if all required fields of a message are set.

  Args:
    errors:  A list which, if provided, will be populated with the field
             paths of all missing required fields.

  Returns:
    True iff the specified message has all required fields set.
  """

  # Performance is critical so we avoid HasField() and ListFields().

  for field in required_fields:
    if (field not in self._fields or
        (field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE and
         not self._fields[field]._is_present_in_parent)):
      if errors is not None:
        errors.extend(self.FindInitializationErrors())
      return False

  for field, value in list(self._fields.items()):  # dict can change size!
    if field.cpp_type == _FieldDescriptor.CPPTYPE_MESSAGE:
      if field.label == _FieldDescriptor.LABEL_REPEATED:
        if (field.message_type.has_options and
            field.message_type.GetOptions().map_entry):
          continue
        for element in value:
          if not element.IsInitialized():
            if errors is not None:
              errors.extend(self.FindInitializationErrors())
            return False
      elif value._is_present_in_parent and not value.IsInitialized():
        if errors is not None:
          errors.extend(self.FindInitializationErrors())
        return False

  return True
def ListFields(self)
Expand source code
def ListFields(self):
  all_fields = [item for item in self._fields.items() if _IsPresent(item)]
  all_fields.sort(key = lambda item: item[0].number)
  return all_fields
def MergeFrom(self, msg)
Expand source code
def MergeFrom(self, msg):
  if not isinstance(msg, cls):
    raise TypeError(
        'Parameter to MergeFrom() must be instance of same class: '
        'expected %s got %s.' % (_FullyQualifiedClassName(cls),
                                 _FullyQualifiedClassName(msg.__class__)))

  assert msg is not self
  self._Modified()

  fields = self._fields

  for field, value in msg._fields.items():
    if field.label == LABEL_REPEATED:
      field_value = fields.get(field)
      if field_value is None:
        # Construct a new object to represent this field.
        field_value = field._default_constructor(self)
        fields[field] = field_value
      field_value.MergeFrom(value)
    elif field.cpp_type == CPPTYPE_MESSAGE:
      if value._is_present_in_parent:
        field_value = fields.get(field)
        if field_value is None:
          # Construct a new object to represent this field.
          field_value = field._default_constructor(self)
          fields[field] = field_value
        field_value.MergeFrom(value)
    else:
      self._fields[field] = value
      if field.containing_oneof:
        self._UpdateOneofState(field)

  if msg._unknown_fields:
    if not self._unknown_fields:
      self._unknown_fields = []
    self._unknown_fields.extend(msg._unknown_fields)
    # pylint: disable=protected-access
    if self._unknown_field_set is None:
      self._unknown_field_set = containers.UnknownFieldSet()
    self._unknown_field_set._extend(msg._unknown_field_set)
def MergeFromString(self, serialized)
Expand source code
def MergeFromString(self, serialized):
  serialized = memoryview(serialized)
  length = len(serialized)
  try:
    if self._InternalParse(serialized, 0, length) != length:
      # The only reason _InternalParse would return early is if it
      # encountered an end-group tag.
      raise message_mod.DecodeError('Unexpected end-group tag.')
  except (IndexError, TypeError):
    # Now ord(buf[p:p+1]) == ord('') gets TypeError.
    raise message_mod.DecodeError('Truncated message.')
  except struct.error as e:
    raise message_mod.DecodeError(e)
  return length   # Return this for legacy reasons.
def SerializePartialToString(self, **kwargs)
Expand source code
def SerializePartialToString(self, **kwargs):
  out = BytesIO()
  self._InternalSerialize(out.write, **kwargs)
  return out.getvalue()
def SerializeToString(self, **kwargs)
Expand source code
def SerializeToString(self, **kwargs):
  # Check if the message has all of its required fields set.
  if not self.IsInitialized():
    raise message_mod.EncodeError(
        'Message %s is missing required fields: %s' % (
        self.DESCRIPTOR.full_name, ','.join(self.FindInitializationErrors())))
  return self.SerializePartialToString(**kwargs)
def SetInParent(self)

Sets the _cached_byte_size_dirty bit to true, and propagates this to our listener iff this was a state change.

Expand source code
def Modified(self):
  """Sets the _cached_byte_size_dirty bit to true,
  and propagates this to our listener iff this was a state change.
  """

  # Note:  Some callers check _cached_byte_size_dirty before calling
  #   _Modified() as an extra optimization.  So, if this method is ever
  #   changed such that it does stuff even when _cached_byte_size_dirty is
  #   already true, the callers need to be updated.
  if not self._cached_byte_size_dirty:
    self._cached_byte_size_dirty = True
    self._listener_for_children.dirty = True
    self._is_present_in_parent = True
    self._listener.Modified()
def UnknownFields(self)
Expand source code
def _UnknownFields(self):
  if self._unknown_field_set is None:  # pylint: disable=protected-access
    # pylint: disable=protected-access
    self._unknown_field_set = containers.UnknownFieldSet()
  return self._unknown_field_set    # pylint: disable=protected-access
def WhichOneof(self, oneof_name)

Returns the name of the currently set field inside a oneof, or None.

Expand source code
def WhichOneof(self, oneof_name):
  """Returns the name of the currently set field inside a oneof, or None."""
  try:
    field = message_descriptor.oneofs_by_name[oneof_name]
  except KeyError:
    raise ValueError(
        'Protocol message has no oneof "%s" field.' % oneof_name)

  nested_field = self._oneofs.get(field, None)
  if nested_field is not None and self.HasField(nested_field.name):
    return nested_field.name
  else:
    return None