mirror of
https://github.com/meshtastic/python.git
synced 2026-06-18 04:20:05 -04:00
Add support for adding contacts using the CLI, including remotely
This commit is contained in:
@@ -552,6 +552,13 @@ def onConnected(interface):
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).resetNodeDb()
|
||||
|
||||
if args.add_contact:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).addContactURL(
|
||||
args.add_contact
|
||||
)
|
||||
|
||||
if args.sendtext:
|
||||
closeNow = True
|
||||
channelIndex = mt_config.channel_index or 0
|
||||
@@ -1074,6 +1081,20 @@ def onConnected(interface):
|
||||
else:
|
||||
print("Install pyqrcode to view a QR code printed to terminal.")
|
||||
|
||||
if args.contact_qr:
|
||||
closeNow = True
|
||||
url = interface.getNode(args.dest, True, **getNode_kwargs).getContactURL(
|
||||
args.contact_qr,
|
||||
should_ignore=args.contact_ignore,
|
||||
manually_verified=args.contact_verified,
|
||||
)
|
||||
print(f"Contact URL: {url}")
|
||||
if pyqrcode is not None:
|
||||
qr = pyqrcode.create(url)
|
||||
print(qr.terminal())
|
||||
else:
|
||||
print("Install pyqrcode to view a QR code printed to terminal.")
|
||||
|
||||
log_set: Optional = None # type: ignore[annotation-unchecked]
|
||||
# we need to keep a reference to the logset so it doesn't get GCed early
|
||||
|
||||
@@ -1858,6 +1879,24 @@ def addChannelConfigArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPa
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--contact-qr",
|
||||
help="Display a QR code for a node's contact data. "
|
||||
"Use the node ID with a '!' or '0x' prefix or the node number. "
|
||||
"Also shows the shareable contact URL.",
|
||||
metavar="!xxxxxxxx",
|
||||
)
|
||||
group.add_argument(
|
||||
"--contact-verified",
|
||||
help="Set the IS_KEY_MANUALLY_VERIFIED bit in the generated contact URL",
|
||||
action="store_true",
|
||||
)
|
||||
group.add_argument(
|
||||
"--contact-ignore",
|
||||
help="Mark this contact as blocked/ignored in the generated contact URL",
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--ch-enable",
|
||||
help="Enable the specified channel. Use --ch-add instead whenever possible.",
|
||||
@@ -2095,6 +2134,13 @@ def addRemoteAdminArgs(parser: argparse.ArgumentParser) -> argparse.ArgumentPars
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--add-contact",
|
||||
help="Add a contact (User) to the NodeDB from a shareable URL. "
|
||||
"Example: https://meshtastic.org/v/#<base64>",
|
||||
metavar="URL",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--set-time",
|
||||
help="Set the time to the provided unix epoch timestamp, or the system's current time if omitted or 0.",
|
||||
|
||||
@@ -380,6 +380,46 @@ class Node:
|
||||
s = s.replace("=", "").replace("+", "-").replace("/", "_")
|
||||
return f"https://meshtastic.org/e/#{s}"
|
||||
|
||||
def getContactURL(self, node_id: Union[int, str], should_ignore: bool = False, manually_verified: bool = False):
|
||||
"""Generate a shareable contact URL for the specified node"""
|
||||
nodeNum = to_node_num(node_id)
|
||||
|
||||
node = self.iface.nodesByNum.get(nodeNum)
|
||||
if not node or not node.get("user"):
|
||||
our_exit(f"Warning: Node {node_id} not found in NodeDB")
|
||||
|
||||
contact = admin_pb2.SharedContact()
|
||||
contact.node_num = nodeNum
|
||||
|
||||
u = node["user"]
|
||||
if u.get("id"):
|
||||
contact.user.id = u["id"]
|
||||
if u.get("macaddr"):
|
||||
contact.user.macaddr = base64.b64decode(u["macaddr"])
|
||||
if u.get("longName"):
|
||||
contact.user.long_name = u["longName"]
|
||||
if u.get("shortName"):
|
||||
contact.user.short_name = u["shortName"]
|
||||
if u.get("hwModel") and u["hwModel"] != "UNSET":
|
||||
contact.user.hw_model = mesh_pb2.HardwareModel.Value(u["hwModel"])
|
||||
if u.get("role"):
|
||||
contact.user.role = u["role"]
|
||||
if u.get("publicKey"):
|
||||
contact.user.public_key = base64.b64decode(u["publicKey"])
|
||||
if u.get("isLicensed"):
|
||||
contact.user.is_licensed = u["isLicensed"]
|
||||
if u.get("isUnmessagable") is not None:
|
||||
contact.user.is_unmessagable = u["isUnmessagable"]
|
||||
if should_ignore:
|
||||
contact.should_ignore = True
|
||||
if manually_verified:
|
||||
contact.manually_verified = True
|
||||
|
||||
data = contact.SerializeToString()
|
||||
s = base64.urlsafe_b64encode(data).decode("ascii")
|
||||
s = s.replace("=", "").replace("+", "-").replace("/", "_")
|
||||
return f"https://meshtastic.org/v/#{s}"
|
||||
|
||||
def setURL(self, url: str, addOnly: bool = False):
|
||||
"""Set mesh network URL"""
|
||||
if self.localConfig is None or self.channels is None:
|
||||
@@ -445,6 +485,32 @@ class Node:
|
||||
self.ensureSessionKey()
|
||||
self._sendAdmin(p)
|
||||
|
||||
def addContactURL(self, url: str):
|
||||
"""Add a contact (User) to the NodeDB from a shareable URL"""
|
||||
self.ensureSessionKey()
|
||||
|
||||
splitURL = url.split("/#")
|
||||
if len(splitURL) == 1:
|
||||
our_exit(f"Warning: Invalid URL '{url}'")
|
||||
b64 = splitURL[-1]
|
||||
|
||||
missing_padding = len(b64) % 4
|
||||
if missing_padding:
|
||||
b64 += "=" * (4 - missing_padding)
|
||||
|
||||
decodedURL = base64.urlsafe_b64decode(b64)
|
||||
contact = admin_pb2.SharedContact()
|
||||
contact.ParseFromString(decodedURL)
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.add_contact.CopyFrom(contact)
|
||||
|
||||
if self == self.iface.localNode:
|
||||
onResponse = None
|
||||
else:
|
||||
onResponse = self.onAckNak
|
||||
return self._sendAdmin(p, onResponse=onResponse)
|
||||
|
||||
def onResponseRequestRingtone(self, p):
|
||||
"""Handle the response packet for requesting ringtone part 1"""
|
||||
logger.debug(f"onResponseRequestRingtone() p:{p}")
|
||||
|
||||
@@ -2998,6 +2998,56 @@ def test_remove_ignored_node():
|
||||
main()
|
||||
|
||||
mocked_node.removeIgnored.assert_called_once_with("!12345678")
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_add_contact_url():
|
||||
"""Test --add-contact with a shareable URL"""
|
||||
url = "https://meshtastic.org/v/#CKqkvZgIElEKCSE4MzBmNTIyYRIQUm9hZHJ1bm5lciBSaWRnZRoEUktTTiIGAAAAAAAAKAk4AkIgRxo_Fw_ergQIhRqBbrHasLYy3gU-Ay8hrhu4OVnIPQc=" # pylint: disable=line-too-long
|
||||
sys.argv = ["", "--add-contact", url]
|
||||
mt_config.args = sys.argv
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.getNode.return_value = mocked_node
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
|
||||
main()
|
||||
|
||||
mocked_node.addContactURL.assert_called_once_with(url)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_contact_qr():
|
||||
"""Test --contact-qr with a node ID"""
|
||||
sys.argv = ["", "--contact-qr", "!830f522a"]
|
||||
mt_config.args = sys.argv
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.getNode.return_value = mocked_node
|
||||
mocked_node.iface = iface
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
|
||||
main()
|
||||
|
||||
mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=False, manually_verified=False)
|
||||
mocked_node.getContactURL.reset_mock()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_contact_qr_with_flags():
|
||||
"""Test --contact-qr with --contact-verified and --contact-ignore"""
|
||||
sys.argv = ["", "--contact-qr", "!830f522a", "--contact-verified", "--contact-ignore"]
|
||||
mt_config.args = sys.argv
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.getNode.return_value = mocked_node
|
||||
mocked_node.iface = iface
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface):
|
||||
main()
|
||||
|
||||
mocked_node.getContactURL.assert_called_once_with("!830f522a", should_ignore=True, manually_verified=True)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_main_set_owner_whitespace_only(capsys):
|
||||
|
||||
@@ -7,7 +7,9 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from ..protobuf import admin_pb2, localonly_pb2, config_pb2
|
||||
import base64
|
||||
|
||||
from ..protobuf import admin_pb2, localonly_pb2, config_pb2, mesh_pb2
|
||||
from ..protobuf.channel_pb2 import Channel # pylint: disable=E0611
|
||||
from ..node import Node
|
||||
from ..serial_interface import SerialInterface
|
||||
@@ -339,6 +341,56 @@ def test_setURL_valid_URL_but_no_settings(capsys):
|
||||
assert err == ""
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_contact_url_roundtrip():
|
||||
"""Verify that contact URL generation and parsing is fully reversible"""
|
||||
def encode_url(contact):
|
||||
data = contact.SerializeToString()
|
||||
s = base64.urlsafe_b64encode(data).decode("ascii")
|
||||
s = s.replace("=", "").replace("+", "-").replace("/", "_")
|
||||
return f"https://meshtastic.org/v/#{s}"
|
||||
|
||||
def decode_url(url):
|
||||
b64 = url.split("/#")[-1]
|
||||
missing_padding = len(b64) % 4
|
||||
if missing_padding:
|
||||
b64 += "=" * (4 - missing_padding)
|
||||
decoded = base64.urlsafe_b64decode(b64)
|
||||
contact = admin_pb2.SharedContact()
|
||||
contact.ParseFromString(decoded)
|
||||
return contact
|
||||
|
||||
original = admin_pb2.SharedContact()
|
||||
original.node_num = 2198819370
|
||||
original.user.id = "!830f522a"
|
||||
original.user.long_name = "Roadrunner Ridge"
|
||||
original.user.short_name = "RKSN"
|
||||
original.user.macaddr = b'\x00\x00\x00\x00\x00\x00'
|
||||
original.user.hw_model = mesh_pb2.HardwareModel.Value("RAK4631")
|
||||
original.user.role = mesh_pb2.User.DESCRIPTOR.fields_by_name['role'].enum_type.values_by_name["ROUTER"].number
|
||||
original.user.public_key = bytes.fromhex("471a3f170fdeae0408851a816eb1dab0b632de053e032f21ae1bb83959c83d07")
|
||||
original.user.is_licensed = True
|
||||
original.user.is_unmessagable = False
|
||||
original.should_ignore = True
|
||||
original.manually_verified = True
|
||||
|
||||
url = encode_url(original)
|
||||
parsed = decode_url(url)
|
||||
|
||||
assert parsed.node_num == original.node_num
|
||||
assert parsed.user.id == original.user.id
|
||||
assert parsed.user.long_name == original.user.long_name
|
||||
assert parsed.user.short_name == original.user.short_name
|
||||
assert parsed.user.macaddr == original.user.macaddr
|
||||
assert parsed.user.hw_model == original.user.hw_model
|
||||
assert parsed.user.role == original.user.role
|
||||
assert parsed.user.public_key == original.user.public_key
|
||||
assert parsed.user.is_licensed == original.user.is_licensed
|
||||
assert parsed.user.is_unmessagable == original.user.is_unmessagable
|
||||
assert parsed.should_ignore == original.should_ignore
|
||||
assert parsed.manually_verified == original.manually_verified
|
||||
|
||||
|
||||
# TODO
|
||||
# @pytest.mark.unit
|
||||
# def test_showChannels(capsys):
|
||||
|
||||
Reference in New Issue
Block a user