mirror of
https://github.com/meshtastic/python.git
synced 2025-12-24 00:17:54 -05:00
split out constant, improve logic some, add tests for channel_hash and generate_channel_hash
This commit is contained in:
@@ -11,16 +11,19 @@ from hypothesis import given, strategies as st
|
||||
from meshtastic.supported_device import SupportedDevice
|
||||
from meshtastic.protobuf import mesh_pb2
|
||||
from meshtastic.util import (
|
||||
DEFAULT_KEY,
|
||||
Timeout,
|
||||
active_ports_on_supported_devices,
|
||||
camel_to_snake,
|
||||
catchAndIgnore,
|
||||
channel_hash,
|
||||
convert_mac_addr,
|
||||
eliminate_duplicate_port,
|
||||
findPorts,
|
||||
fixme,
|
||||
fromPSK,
|
||||
fromStr,
|
||||
generate_channel_hash,
|
||||
genPSK256,
|
||||
hexstr,
|
||||
ipstr,
|
||||
@@ -670,3 +673,45 @@ def test_shorthex():
|
||||
assert result == b'\x05'
|
||||
result = fromStr('0xffff')
|
||||
assert result == b'\xff\xff'
|
||||
|
||||
def test_channel_hash_basics():
|
||||
"Test the default key and LongFast with channel_hash"
|
||||
assert channel_hash(DEFAULT_KEY) == 2
|
||||
assert channel_hash("LongFast".encode("utf-8")) == 10
|
||||
|
||||
@given(st.text(min_size=1, max_size=12))
|
||||
def test_channel_hash_fuzz(channel_name):
|
||||
"Test channel_hash with fuzzed channel names, ensuring it produces single-byte values"
|
||||
hashed = channel_hash(channel_name.encode("utf-8"))
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
def test_generate_channel_hash_basics():
|
||||
"Test the default key and LongFast/MediumFast with generate_channel_hash"
|
||||
assert generate_channel_hash("LongFast", "AQ==") == 8
|
||||
assert generate_channel_hash("LongFast", bytes([1])) == 8
|
||||
assert generate_channel_hash("LongFast", DEFAULT_KEY) == 8
|
||||
assert generate_channel_hash("MediumFast", DEFAULT_KEY) == 31
|
||||
|
||||
@given(st.text(min_size=1, max_size=12))
|
||||
def test_generate_channel_hash_fuzz_default_key(channel_name):
|
||||
"Test generate_channel_hash with fuzzed channel names and the default key, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, DEFAULT_KEY)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@given(st.text(min_size=1, max_size=12), st.binary(min_size=1, max_size=1))
|
||||
def test_generate_channel_hash_fuzz_simple(channel_name, key_bytes):
|
||||
"Test generate_channel_hash with fuzzed channel names and one-byte keys, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, key_bytes)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@given(st.text(min_size=1, max_size=12), st.binary(min_size=16, max_size=16))
|
||||
def test_generate_channel_hash_fuzz_aes128(channel_name, key_bytes):
|
||||
"Test generate_channel_hash with fuzzed channel names and 128-bit keys, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, key_bytes)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@given(st.text(min_size=1, max_size=12), st.binary(min_size=32, max_size=32))
|
||||
def test_generate_channel_hash_fuzz_aes256(channel_name, key_bytes):
|
||||
"Test generate_channel_hash with fuzzed channel names and 256-bit keys, ensuring it produces single-byte values"
|
||||
hashed = generate_channel_hash(channel_name, key_bytes)
|
||||
assert 0 <= hashed <= 0xFF
|
||||
|
||||
@@ -40,6 +40,8 @@ whitelistVids = dict.fromkeys([0x239a, 0x303a])
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==".encode("utf-8"))
|
||||
|
||||
def quoteBooleans(a_string: str) -> str:
|
||||
"""Quote booleans
|
||||
given a string that contains ": true", replace with ": 'true'" (or false)
|
||||
@@ -372,24 +374,21 @@ def channel_hash(data: bytes) -> int:
|
||||
result ^= char
|
||||
return result
|
||||
|
||||
def generate_channel_hash(name, key) -> int:
|
||||
def generate_channel_hash(name: Union[str, bytes], key: Union[str, bytes]) -> int:
|
||||
"""generate the channel number by hashing the channel name and psk (accepts str or bytes for both)"""
|
||||
# Handle key as str or bytes
|
||||
if isinstance(key, bytes):
|
||||
key = key.decode("utf-8")
|
||||
if key == "AQ==":
|
||||
key = "1PG7OiApB1nwvP+rz05pAQ=="
|
||||
replaced_key = key.replace("-", "+").replace("_", "/")
|
||||
key_bytes = base64.b64decode(replaced_key.encode("utf-8"))
|
||||
if isinstance(key, str):
|
||||
key = base64.b64decode(key.replace("-", "+").replace("_", "/").encode("utf-8"))
|
||||
|
||||
if len(key) == 1:
|
||||
key = DEFAULT_KEY[:-1] + key
|
||||
|
||||
# Handle name as str or bytes
|
||||
if isinstance(name, bytes):
|
||||
name_bytes = name
|
||||
else:
|
||||
name_bytes = name.encode("utf-8")
|
||||
if isinstance(name, str):
|
||||
name = name.encode("utf-8")
|
||||
|
||||
h_name = channel_hash(name_bytes)
|
||||
h_key = channel_hash(key_bytes)
|
||||
h_name = channel_hash(name)
|
||||
h_key = channel_hash(key)
|
||||
result: int = h_name ^ h_key
|
||||
return result
|
||||
|
||||
|
||||
Reference in New Issue
Block a user