mirror of
https://github.com/meshtastic/python.git
synced 2026-01-18 12:48:03 -05:00
Move to_node_num to util and updated function calls
This commit is contained in:
@@ -16,6 +16,7 @@ from meshtastic.util import (
|
||||
pskToString,
|
||||
stripnl,
|
||||
message_to_json,
|
||||
to_node_num,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -52,20 +53,6 @@ class Node:
|
||||
r += ")"
|
||||
return r
|
||||
|
||||
def _to_node_num(self, nodeId: Union[int, str]) -> int:
|
||||
"""Normalize node id from int | '!hex' | '0xhex' | 'decimal' to int."""
|
||||
if isinstance(nodeId, int):
|
||||
return nodeId
|
||||
s = str(nodeId).strip()
|
||||
if s.startswith("!"):
|
||||
s = s[1:]
|
||||
if s.lower().startswith("0x"):
|
||||
return int(s, 16)
|
||||
try:
|
||||
return int(s, 10)
|
||||
except ValueError:
|
||||
return int(s, 16)
|
||||
|
||||
def module_available(self, excluded_bit: int) -> bool:
|
||||
"""Check DeviceMetadata.excluded_modules to see if a module is available."""
|
||||
meta = getattr(self.iface, "metadata", None)
|
||||
@@ -728,7 +715,7 @@ class Node:
|
||||
def removeNode(self, nodeId: Union[int, str]):
|
||||
"""Tell the node to remove a specific node by ID"""
|
||||
self.ensureSessionKey()
|
||||
nodeId = self._to_node_num(nodeId)
|
||||
nodeId = to_node_num(nodeId)
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.remove_by_nodenum = nodeId
|
||||
@@ -742,7 +729,7 @@ class Node:
|
||||
def setFavorite(self, nodeId: Union[int, str]):
|
||||
"""Tell the node to set the specified node ID to be favorited on the NodeDB on the device"""
|
||||
self.ensureSessionKey()
|
||||
nodeId = self._to_node_num(nodeId)
|
||||
nodeId = to_node_num(nodeId)
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.set_favorite_node = nodeId
|
||||
@@ -756,7 +743,7 @@ class Node:
|
||||
def removeFavorite(self, nodeId: Union[int, str]):
|
||||
"""Tell the node to set the specified node ID to be un-favorited on the NodeDB on the device"""
|
||||
self.ensureSessionKey()
|
||||
nodeId = self._to_node_num(nodeId)
|
||||
nodeId = to_node_num(nodeId)
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.remove_favorite_node = nodeId
|
||||
@@ -770,7 +757,7 @@ class Node:
|
||||
def setIgnored(self, nodeId: Union[int, str]):
|
||||
"""Tell the node to set the specified node ID to be ignored on the NodeDB on the device"""
|
||||
self.ensureSessionKey()
|
||||
nodeId = self._to_node_num(nodeId)
|
||||
nodeId = to_node_num(nodeId)
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.set_ignored_node = nodeId
|
||||
@@ -784,7 +771,7 @@ class Node:
|
||||
def removeIgnored(self, nodeId: Union[int, str]):
|
||||
"""Tell the node to set the specified node ID to be un-ignored on the NodeDB on the device"""
|
||||
self.ensureSessionKey()
|
||||
nodeId = self._to_node_num(nodeId)
|
||||
nodeId = to_node_num(nodeId)
|
||||
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.remove_ignored_node = nodeId
|
||||
@@ -1007,7 +994,7 @@ class Node:
|
||||
): # unless a special channel index was used, we want to use the admin index
|
||||
adminIndex = self.iface.localNode._getAdminChannelIndex()
|
||||
logger.debug(f"adminIndex:{adminIndex}")
|
||||
nodeid = self._to_node_num(self.nodeNum)
|
||||
nodeid = to_node_num(self.nodeNum)
|
||||
if "adminSessionPassKey" in self.iface._getOrCreateByNum(nodeid):
|
||||
p.session_passkey = self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey")
|
||||
return self.iface.sendData(
|
||||
@@ -1028,6 +1015,6 @@ class Node:
|
||||
f"Not ensuring session key, because protocol use is disabled by noProto"
|
||||
)
|
||||
else:
|
||||
nodeid = self._to_node_num(self.nodeNum)
|
||||
nodeid = to_node_num(self.nodeNum)
|
||||
if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
|
||||
self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)
|
||||
|
||||
@@ -692,3 +692,20 @@ def message_to_json(message: Message, multiline: bool=False) -> str:
|
||||
except TypeError:
|
||||
json = MessageToJson(message, including_default_value_fields=True) # type: ignore[call-arg] # pylint: disable=E1123
|
||||
return stripnl(json) if not multiline else json
|
||||
|
||||
|
||||
def to_node_num(node_id: Union[int, str]) -> int:
|
||||
"""
|
||||
Normalize a node id from int | '!hex' | '0xhex' | 'decimal' to int.
|
||||
"""
|
||||
if isinstance(node_id, int):
|
||||
return node_id
|
||||
s = str(node_id).strip()
|
||||
if s.startswith("!"):
|
||||
s = s[1:]
|
||||
if s.lower().startswith("0x"):
|
||||
return int(s, 16)
|
||||
try:
|
||||
return int(s, 10)
|
||||
except ValueError:
|
||||
return int(s, 16)
|
||||
|
||||
Reference in New Issue
Block a user