mirror of
https://github.com/meshtastic/python.git
synced 2025-12-24 08:27:55 -05:00
Merge pull request #843 from SpudGunMan/channel-hash-info
Add Channel Hash Utility to Node class
This commit is contained in:
@@ -16,6 +16,7 @@ from meshtastic.util import (
|
||||
pskToString,
|
||||
stripnl,
|
||||
message_to_json,
|
||||
generate_channel_hash,
|
||||
to_node_num,
|
||||
)
|
||||
|
||||
@@ -1018,3 +1019,20 @@ class Node:
|
||||
nodeid = to_node_num(self.nodeNum)
|
||||
if self.iface._getOrCreateByNum(nodeid).get("adminSessionPassKey") is None:
|
||||
self.requestConfig(admin_pb2.AdminMessage.SESSIONKEY_CONFIG)
|
||||
|
||||
def get_channels_with_hash(self):
|
||||
"""Return a list of dicts with channel info and hash."""
|
||||
result = []
|
||||
if self.channels:
|
||||
for c in self.channels:
|
||||
if c.settings and hasattr(c.settings, "name") and hasattr(c.settings, "psk"):
|
||||
hash_val = generate_channel_hash(c.settings.name, c.settings.psk)
|
||||
else:
|
||||
hash_val = None
|
||||
result.append({
|
||||
"index": c.index,
|
||||
"role": channel_pb2.Channel.Role.Name(c.role),
|
||||
"name": c.settings.name if c.settings and hasattr(c.settings, "name") else "",
|
||||
"hash": hash_val,
|
||||
})
|
||||
return result
|
||||
|
||||
@@ -11,16 +11,19 @@ from hypothesis import given, strategies as st
|
||||
from meshtastic.supported_device import SupportedDevice
|
||||
from meshtastic.protobuf import mesh_pb2
|
||||
from meshtastic.util import (
|
||||
DEFAULT_KEY,
|
||||
Timeout,
|
||||
active_ports_on_supported_devices,
|
||||
camel_to_snake,
|
||||
catchAndIgnore,
|
||||
channel_hash,
|
||||
convert_mac_addr,
|
||||
eliminate_duplicate_port,
|
||||
findPorts,
|
||||
fixme,
|
||||
fromPSK,
|
||||
fromStr,
|
||||
generate_channel_hash,
|
||||
genPSK256,
|
||||
hexstr,
|
||||
ipstr,
|
||||
@@ -670,3 +673,45 @@ def test_shorthex():
|
||||
assert result == b'\x05'
|
||||
result = fromStr('0xffff')
|
||||
assert result == b'\xff\xff'
|
||||
|
||||
def test_channel_hash_basics():
|
||||
"Test the default key and LongFast with channel_hash"
|
||||
assert channel_hash(DEFAULT_KEY) == 2
|
||||
assert channel_hash("LongFast".encode("utf-8")) == 10
|
||||
|
||||
@given(st.text(min_size=1, max_size=12))
|
||||
def test_channel_hash_fuzz(channel_name):
|
||||
"Test channel_hash with fuzzed channel names, ensuring it produces single-byte values"
|
||||
hashed = channel_hash(channel_name.encode("utf-8"))
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
def test_generate_channel_hash_basics():
|
||||
"Test the default key and LongFast/MediumFast with generate_channel_hash"
|
||||
assert generate_channel_hash("LongFast", "AQ==") == 8
|
||||
assert generate_channel_hash("LongFast", bytes([1])) == 8
|
||||
assert generate_channel_hash("LongFast", DEFAULT_KEY) == 8
|
||||
assert generate_channel_hash("MediumFast", DEFAULT_KEY) == 31
|
||||
|
||||
@given(st.text(min_size=1, max_size=12))
|
||||
def test_generate_channel_hash_fuzz_default_key(channel_name):
|
||||
"Test generate_channel_hash with fuzzed channel names and the default key, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, DEFAULT_KEY)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@given(st.text(min_size=1, max_size=12), st.binary(min_size=1, max_size=1))
|
||||
def test_generate_channel_hash_fuzz_simple(channel_name, key_bytes):
|
||||
"Test generate_channel_hash with fuzzed channel names and one-byte keys, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, key_bytes)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@given(st.text(min_size=1, max_size=12), st.binary(min_size=16, max_size=16))
|
||||
def test_generate_channel_hash_fuzz_aes128(channel_name, key_bytes):
|
||||
"Test generate_channel_hash with fuzzed channel names and 128-bit keys, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, key_bytes)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@given(st.text(min_size=1, max_size=12), st.binary(min_size=32, max_size=32))
|
||||
def test_generate_channel_hash_fuzz_aes256(channel_name, key_bytes):
|
||||
"Test generate_channel_hash with fuzzed channel names and 256-bit keys, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, key_bytes)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@@ -40,6 +40,8 @@ whitelistVids = dict.fromkeys([0x239a, 0x303a])
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==".encode("utf-8"))
|
||||
|
||||
def quoteBooleans(a_string: str) -> str:
|
||||
"""Quote booleans
|
||||
given a string that contains ": true", replace with ": 'true'" (or false)
|
||||
@@ -365,6 +367,30 @@ def remove_keys_from_dict(keys: Union[Tuple, List, Set], adict: Dict) -> Dict:
|
||||
remove_keys_from_dict(keys, val)
|
||||
return adict
|
||||
|
||||
def channel_hash(data: bytes) -> int:
|
||||
"""Compute an XOR hash from bytes for channel evaluation."""
|
||||
result = 0
|
||||
for char in data:
|
||||
result ^= char
|
||||
return result
|
||||
|
||||
def generate_channel_hash(name: Union[str, bytes], key: Union[str, bytes]) -> int:
|
||||
"""generate the channel number by hashing the channel name and psk (accepts str or bytes for both)"""
|
||||
# Handle key as str or bytes
|
||||
if isinstance(key, str):
|
||||
key = base64.b64decode(key.replace("-", "+").replace("_", "/").encode("utf-8"))
|
||||
|
||||
if len(key) == 1:
|
||||
key = DEFAULT_KEY[:-1] + key
|
||||
|
||||
# Handle name as str or bytes
|
||||
if isinstance(name, str):
|
||||
name = name.encode("utf-8")
|
||||
|
||||
h_name = channel_hash(name)
|
||||
h_key = channel_hash(key)
|
||||
result: int = h_name ^ h_key
|
||||
return result
|
||||
|
||||
def hexstr(barray: bytes) -> str:
|
||||
"""Print a string of hex digits"""
|
||||
|
||||
Reference in New Issue
Block a user