Merge pull request #933 from ianmcorvidae/shared-contact

Add support for adding contacts using the CLI, including remotely
This commit is contained in:
Ian McEwen
2026-06-17 07:42:14 -07:00
committed by GitHub
6 changed files with 481 additions and 2 deletions

View File

@@ -552,6 +552,13 @@ def onConnected(interface):
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).resetNodeDb()
if args.add_contact:
closeNow = True
waitForAckNak = True
interface.getNode(args.dest, False, **getNode_kwargs).addContactURL(
args.add_contact
)
if args.sendtext:
closeNow = True
channelIndex = mt_config.channel_index or 0
@@ -1074,6 +1081,20 @@ def onConnected(interface):
else:
print("Install pyqrcode to view a QR code printed to terminal.")
if args.contact_qr:
closeNow = True
url = interface.getNode(args.dest, True, **getNode_kwargs).getContactURL(
args.contact_qr,
should_ignore=args.contact_ignore,
manually_verified=args.contact_verified,
)
print(f"Contact URL: {url}")
if pyqrcode is not None:
qr = pyqrcode.create(url)
print(qr.terminal())
else:
print("Install pyqrcode to view a QR code printed to terminal.")
log_set: Optional = None # type: ignore[annotation-unchecked]
# we need to keep a reference to the logset so it doesn't get GCed early
@@ -1858,6 +1879,24 @@ def addChannelConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPa
action="store_true",
)
group.add_argument(
"--contact-qr",
help="Display a QR code for a node's contact data. "
"Use the node ID with a '!' or '0x' prefix or the node number. "
"Also shows the shareable contact URL.",
metavar="!xxxxxxxx",
)
group.add_argument(
"--contact-verified",
help="Set the IS_KEY_MANUALLY_VERIFIED bit in the generated contact URL",
action="store_true",
)
group.add_argument(
"--contact-ignore",
help="Mark this contact as blocked/ignored in the generated contact URL",
action="store_true",
)
group.add_argument(
"--ch-enable",
help="Enable the specified channel. Use --ch-add instead whenever possible.",
@@ -2095,6 +2134,13 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars
action="store_true",
)
group.add_argument(
"--add-contact",
help="Add a contact (User) to the NodeDB from a shareable URL. "
"Example: https://meshtastic.org/v/#<base64>",
metavar="URL",
)
group.add_argument(
"--set-time",
help="Set the time to the provided unix epoch timestamp, or the system's current time if omitted or 0.",

View File

@@ -380,6 +380,46 @@ class Node:
s = s.replace("=", "").replace("+", "-").replace("/", "_")
return f"https://meshtastic.org/e/#{s}"
def getContactURL(self, node_id: Union[int, str], should_ignore: bool = False, manually_verified: bool = False):
"""Generate a shareable contact URL for the specified node"""
nodeNum = to_node_num(node_id)
node = self.iface.nodesByNum.get(nodeNum)
if not node or not node.get("user"):
our_exit(f"Warning: Node {node_id} not found in NodeDB")
contact = admin_pb2.SharedContact()
contact.node_num = nodeNum
u = node["user"]
if u.get("id"):
contact.user.id = u["id"]
if u.get("macaddr"):
contact.user.macaddr = base64.b64decode(u["macaddr"])
if u.get("longName"):
contact.user.long_name = u["longName"]
if u.get("shortName"):
contact.user.short_name = u["shortName"]
if u.get("hwModel") and u["hwModel"] != "UNSET":
contact.user.hw_model = mesh_pb2.HardwareModel.Value(u["hwModel"])
if u.get("role"):
contact.user.role = config_pb2.Config.DeviceConfig.Role.Value(u["role"])
if u.get("publicKey"):
contact.user.public_key = base64.b64decode(u["publicKey"])
if u.get("isLicensed"):
contact.user.is_licensed = u["isLicensed"]
if u.get("isUnmessagable") is not None:
contact.user.is_unmessagable = u["isUnmessagable"]
if should_ignore:
contact.should_ignore = True
if manually_verified:
contact.manually_verified = True
data = contact.SerializeToString()
s = base64.urlsafe_b64encode(data).decode("ascii")
s = s.replace("=", "").replace("+", "-").replace("/", "_")
return f"https://meshtastic.org/v/#{s}"
def setURL(self, url: str, addOnly: bool = False):
"""Set mesh network URL"""
if self.localConfig is None or self.channels is None:
@@ -445,6 +485,32 @@ class Node:
self.ensureSessionKey()
self._sendAdmin(p)
def addContactURL(self, url: str):
"""Add a contact (User) to the NodeDB from a shareable URL"""
self.ensureSessionKey()
splitURL = url.split("/#")
if len(splitURL) == 1:
our_exit(f"Warning: Invalid URL '{url}'")
b64 = splitURL[-1]
missing_padding = len(b64) % 4
if missing_padding:
b64 += "=" * (4 - missing_padding)
decodedURL = base64.urlsafe_b64decode(b64)
contact = admin_pb2.SharedContact()
contact.ParseFromString(decodedURL)
p = admin_pb2.AdminMessage()
p.add_contact.CopyFrom(contact)
if self == self.iface.localNode:
onResponse = None
else:
onResponse = self.onAckNak
return self._sendAdmin(p, onResponse=onResponse)
def onResponseRequestRingtone(self, p):
"""Handle the response packet for requesting ringtone part 1"""
logger.debug(f"onResponseRequestRingtone() p:{p}")

View File

@@ -2998,6 +2998,56 @@ def test_remove_ignored_node():
main()
mocked_node.removeIgnored.assert_called_once_with("!12345678")
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_add_contact_url():
"""Test --add-contact with a shareable URL"""
url = "https://meshtastic.org/v/#CKqkvZgIElEKCSE4MzBmNTIyYRIQUm9hZHJ1bm5lciBSaWRnZRoEUktTTiIGAAAAAAAAKAk4AkIgRxo_Fw_ergQIhRqBbrHasLYy3gU-Ay8hrhu4OVnIPQc=" # pylint: disable=line-too-long
sys.argv = ["", "--add-contact", url]
mt_config.args = sys.argv
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
main()
mocked_node.addContactURL.assert_called_once_with(url)
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_contact_qr():
"""Test --contact-qr with a node ID"""
sys.argv = ["", "--contact-qr", "!830f522a"]
mt_config.args = sys.argv
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
mocked_node.iface = iface
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
main()
mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=False, manually_verified=False)
mocked_node.getContactURL.reset_mock()
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_contact_qr_with_flags():
"""Test --contact-qr with --contact-verified and --contact-ignore"""
sys.argv = ["", "--contact-qr", "!830f522a", "--contact-verified", "--contact-ignore"]
mt_config.args = sys.argv
mocked_node = MagicMock(autospec=Node)
iface = MagicMock(autospec=SerialInterface)
iface.getNode.return_value = mocked_node
mocked_node.iface = iface
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
main()
mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=True, manually_verified=True)
@pytest.mark.unit
@pytest.mark.usefixtures("reset_mt_config")
def test_main_set_owner_whitespace_only(capsys):

View File

@@ -1,17 +1,20 @@
"""Meshtastic unit tests for node.py"""
# pylint: disable=C0302
import base64
import logging
import re
from unittest.mock import MagicMock, patch
import pytest
from hypothesis import given, strategies as st
from ..protobuf import admin_pb2, localonly_pb2, config_pb2
from ..protobuf import admin_pb2, localonly_pb2, config_pb2, mesh_pb2, nanopb_pb2
from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
from ..node import Node
from ..serial_interface import SerialInterface
from ..mesh_interface import MeshInterface
from ..util import to_node_num
# from ..config_pb2 import Config
# from ..cannedmessages_pb2 import (CannedMessagePluginMessagePart1, CannedMessagePluginMessagePart2,
@@ -19,6 +22,11 @@ from ..mesh_interface import MeshInterface
# CannedMessagePluginMessagePart5)
# from ..util import Timeout
# Extract nanopb max_size constraints from the User protobuf descriptor
_USER_NANOPB = {
field.name: field.GetOptions().Extensions[nanopb_pb2.nanopb]
for field in mesh_pb2.User.DESCRIPTOR.fields
}
@pytest.mark.unit
def test_node(capsys):
@@ -339,6 +347,248 @@ def test_setURL_valid_URL_but_no_settings(capsys):
assert err == ""
@pytest.mark.unit
@pytest.mark.parametrize("node_id,node_data,should_ignore,manually_verified", [
pytest.param(
"!830f522a",
{
"num": 2198819370,
"user": {
"id": "!830f522a",
"longName": "Roadrunner Ridge",
"shortName": "RKSN",
"macaddr": "AAAAAAAAAAA=",
"hwModel": "RAK4631",
"role": "ROUTER",
"publicKey": "Rx8XD96uBAiFGoFusdqwti3eBT4DLyGuG7g5Wcg9Bw==",
"isLicensed": True,
"isUnmessagable": False,
},
},
True,
True,
id="all_fields_all_flags",
),
pytest.param(
"!12345678",
{
"num": 305419896,
"user": {
"id": "!12345678",
"longName": "Test Node",
"shortName": "TN",
"macaddr": "QkVTVEVWRVI=",
"hwModel": "TBEAM",
},
},
False,
False,
id="minimal_fields_no_flags",
),
pytest.param(
305419896,
{
"num": 305419896,
"user": {
"id": "!12345678",
"longName": "Another Node",
"shortName": "AN",
"macaddr": "QkVTVEVWRVI=",
"hwModel": "HELTEC_V3",
"role": "CLIENT",
"publicKey": "AAAAAAECAwQFBgcICQoLDA0ODxAREhMUFRYXGBkaGxwdHh8=",
"isLicensed": False,
},
},
True,
False,
id="int_node_id_should_ignore_only",
),
pytest.param(
"!deadbeef",
{
"num": 3735928559,
"user": {
"id": "!deadbeef",
"longName": "Minimal Contact",
"shortName": "MC",
"macaddr": "BQYHCAkKCw==",
"hwModel": "UNSET",
"role": "CLIENT_MUTE",
},
},
False,
True,
id="unset_hw_model_verified_only",
),
pytest.param(
"!1a2b3c4d",
{
"num": 439041101,
"user": {
"id": "!1a2b3c4d",
"longName": "Licensed Node",
"shortName": "LN",
"macaddr": "DA0ODxAREg==",
"hwModel": "NANO_G1",
"isLicensed": True,
"isUnmessagable": True,
},
},
False,
False,
id="licensed_unmessagable_no_flags",
),
])
def test_contact_url_roundtrip(node_id, node_data, should_ignore, manually_verified):
"""Verify that contact URL generation via getContactURL() and parsing via addContactURL() is fully reversible"""
iface = MagicMock(autospec=MeshInterface)
node_num = to_node_num(node_id)
iface.nodesByNum = {node_num: node_data}
iface.localNode = None
anode = Node(iface, node_num, noProto=True)
sent_admin = []
def capture_send(p, *_args, **_kwargs):
sent_admin.append(p)
with patch.object(anode, "_sendAdmin", side_effect=capture_send):
url = anode.getContactURL(node_id, should_ignore=should_ignore, manually_verified=manually_verified)
assert url.startswith("https://meshtastic.org/v/#")
anode.addContactURL(url)
assert len(sent_admin) == 1
contact = sent_admin[0].add_contact
u = node_data["user"]
assert contact.node_num == node_num
assert contact.user.id == u["id"]
assert contact.user.long_name == u["longName"]
assert contact.user.short_name == u["shortName"]
assert contact.user.macaddr == base64.b64decode(u["macaddr"])
if u.get("hwModel") and u["hwModel"] != "UNSET":
assert contact.user.hw_model == mesh_pb2.HardwareModel.Value(u["hwModel"])
if u.get("role"):
assert contact.user.role == config_pb2.Config.DeviceConfig.Role.Value(u["role"])
if u.get("publicKey"):
assert contact.user.public_key == base64.b64decode(u["publicKey"])
if u.get("isLicensed"):
assert contact.user.is_licensed is True
if u.get("isUnmessagable") is not None:
assert contact.user.is_unmessagable == u["isUnmessagable"]
assert contact.should_ignore == should_ignore
assert contact.manually_verified == manually_verified
@st.composite
def contact_url_roundtrip_params(draw):
"""Hypothesis strategy: generate a full node config and roundtrip flags"""
should_ignore = draw(st.booleans())
manually_verified = draw(st.booleans())
node_num = draw(st.integers(min_value=6, max_value=2**32 - 2))
node_id = f"!{node_num:08x}"
hw_model = draw(st.sampled_from(list(mesh_pb2.HardwareModel.keys())))
role = draw(st.one_of(
st.none(),
st.sampled_from(list(config_pb2.Config.DeviceConfig.Role.keys())),
))
long_name = draw(st.text(
min_size=1, max_size=_USER_NANOPB['long_name'].max_size
))
short_name = draw(st.text(
min_size=1, max_size=_USER_NANOPB['short_name'].max_size
))
macaddr_bytes = draw(st.binary(
min_size=_USER_NANOPB['macaddr'].max_size,
max_size=_USER_NANOPB['macaddr'].max_size,
))
macaddr_b64 = base64.b64encode(macaddr_bytes).decode("ascii")
has_public_key = draw(st.booleans())
public_key_b64 = None
if has_public_key:
pk_bytes = draw(st.binary(
min_size=_USER_NANOPB['public_key'].max_size,
max_size=_USER_NANOPB['public_key'].max_size,
))
public_key_b64 = base64.b64encode(pk_bytes).decode("ascii")
is_licensed = draw(st.booleans())
is_unmessagable = draw(st.booleans())
node_data = {
"num": node_num,
"user": {
"id": node_id,
"longName": long_name,
"shortName": short_name,
"macaddr": macaddr_b64,
"hwModel": hw_model,
"isLicensed": is_licensed,
"isUnmessagable": is_unmessagable,
},
}
if role is not None:
node_data["user"]["role"] = role
if public_key_b64 is not None:
node_data["user"]["publicKey"] = public_key_b64
return node_num, node_data, should_ignore, manually_verified
@pytest.mark.unit
@given(contact_url_roundtrip_params())
def test_contact_url_roundtrip_hypothesis(params):
"""Property: roundtrip preserves data across random field configurations"""
node_num, node_data, should_ignore, manually_verified = params
iface = MagicMock(autospec=MeshInterface)
iface.nodesByNum = {node_num: node_data}
iface.localNode = None
anode = Node(iface, node_num, noProto=True)
sent_admin = []
def capture_send(p, *_args, **_kwargs):
sent_admin.append(p)
with patch.object(anode, "_sendAdmin", side_effect=capture_send):
url = anode.getContactURL(
node_num,
should_ignore=should_ignore,
manually_verified=manually_verified,
)
anode.addContactURL(url)
assert len(sent_admin) == 1
contact = sent_admin[0].add_contact
u = node_data["user"]
assert contact.node_num == node_num
assert contact.user.id == u["id"]
assert contact.user.long_name == u["longName"]
assert contact.user.short_name == u["shortName"]
assert contact.user.macaddr == base64.b64decode(u["macaddr"])
assert contact.user.hw_model == mesh_pb2.HardwareModel.Value(u["hwModel"])
if "role" in u:
assert contact.user.role == config_pb2.Config.DeviceConfig.Role.Value(u["role"])
if "publicKey" in u:
assert contact.user.public_key == base64.b64decode(u["publicKey"])
assert contact.user.is_licensed == u["isLicensed"]
assert contact.user.is_unmessagable == u["isUnmessagable"]
assert contact.should_ignore == should_ignore
assert contact.manually_verified == manually_verified
# TODO
# @pytest.mark.unit
# def test_showChannels(capsys):

View File

@@ -37,6 +37,7 @@ from meshtastic.util import (
stripnl,
support_info,
message_to_json,
to_node_num,
Acknowledgment
)
@@ -715,3 +716,69 @@ def test_generate_channel_hash_fuzz_aes256(channel_name, key_bytes):
"Test generate_channel_hash with fuzzed channel names and 256-bit keys, ensuring it produces single-byte values"
hashed = generate_channel_hash(channel_name, key_bytes)
assert 0 <= hashed <= 0xFF
@pytest.mark.unit
@pytest.mark.parametrize("input_val,expected", [
# int passthrough
(0, 0),
(1, 1),
(6, 6),
(502009325, 502009325),
(2198819370, 2198819370),
(0xFFFFFFFF, 0xFFFFFFFF),
# !hex format (always treated as hex)
("!00000000", 0x00000000),
("!00000001", 0x00000001),
("!00000010", 0x00000010),
("!000000ff", 0x000000FF),
("!830f522a", 0x830F522A),
("!1dec0ded", 0x1DEC0DED),
("!ffffffff", 0xFFFFFFFF),
("!FFFFFFFF", 0xFFFFFFFF),
# 0xhex format
("0x00000000", 0x00000000),
("0x00000010", 0x00000010),
("0x830f522a", 0x830F522A),
("0x1dec0ded", 0x1DEC0DED),
("0xFFFFFFFF", 0xFFFFFFFF),
# Unprefixed hex string (falls back to hex when decimal fails)
("830f522a", 0x830F522A),
("1dec0ded", 0x1DEC0DED),
# Decimal string
("42", 42),
("12345678", 12345678),
("0", 0),
("1", 1),
# With whitespace
(" !830f522a ", 2198819370),
(" !00000010 ", 16),
(" 0x830f522a ", 2198819370),
])
def test_to_node_num(input_val, expected):
"""Test to_node_num with various valid inputs"""
assert to_node_num(input_val) == expected
@pytest.mark.unit
@pytest.mark.parametrize("input_val", [
"",
"!",
"!!",
"!0x10",
"!xyz",
])
def test_to_node_num_invalid(input_val):
"""Test to_node_num raises ValueError for invalid inputs"""
with pytest.raises(ValueError):
to_node_num(input_val)
@pytest.mark.unit
@given(st.integers(min_value=0, max_value=2**32 - 1))
def test_to_node_num_hypothesis_roundtrip(n):
"""Property: all supported input formats roundtrip for any valid node number"""
assert to_node_num(n) == n
assert to_node_num(f"!{n:08x}") == n
assert to_node_num(f"0x{n:x}") == n
assert to_node_num(str(n)) == n

View File

@@ -728,7 +728,7 @@ def to_node_num(node_id: Union[int, str]) -> int:
return node_id
s = str(node_id).strip()
if s.startswith("!"):
s = s[1:]
s = "0x" + s[1:]
if s.lower().startswith("0x"):
return int(s, 16)
try: