mirror of
https://github.com/meshtastic/python.git
synced 2026-02-19 23:25:21 -05:00
captured packets and simulate what server would send
This commit is contained in:
@@ -38,8 +38,6 @@ class StreamInterface(MeshInterface):
|
||||
self._rxBuf = bytes() # empty
|
||||
self._wantExit = False
|
||||
|
||||
self.stream = None
|
||||
|
||||
# FIXME, figure out why daemon=True causes reader thread to exit too early
|
||||
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
|
||||
|
||||
@@ -105,6 +103,7 @@ class StreamInterface(MeshInterface):
|
||||
bufLen = len(b)
|
||||
# We convert into a string, because the TCP code doesn't work with byte arrays
|
||||
header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
|
||||
logging.debug(f'sending header:{header} b:{b}')
|
||||
self._writeBytes(header + b)
|
||||
|
||||
def close(self):
|
||||
@@ -127,9 +126,10 @@ class StreamInterface(MeshInterface):
|
||||
logging.debug("reading character")
|
||||
b = self._readBytes(1)
|
||||
logging.debug("In reader loop")
|
||||
logging.debug(f"read returned {b}")
|
||||
#logging.debug(f"read returned {b}")
|
||||
if len(b) > 0:
|
||||
c = b[0]
|
||||
#logging.debug(f'c:{c}')
|
||||
ptr = len(self._rxBuf)
|
||||
|
||||
# Assume we want to append this byte, fixme use bytearray instead
|
||||
@@ -148,6 +148,7 @@ class StreamInterface(MeshInterface):
|
||||
if c != START2:
|
||||
self._rxBuf = empty # failed to find start2
|
||||
elif ptr >= HEADER_LEN - 1: # we've at least got a header
|
||||
#logging.debug('at least we received a header')
|
||||
# big endian length follows header
|
||||
packetlen = (self._rxBuf[2] << 8) + self._rxBuf[3]
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ def test_StreamInterface_with_noProto(caplog, reset_globals):
|
||||
test_data = b'hello'
|
||||
stream.read.return_value = test_data
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
iface = StreamInterface(noProto=True)
|
||||
iface = StreamInterface(noProto=True, connectNow=False)
|
||||
iface.stream = stream
|
||||
iface._writeBytes(test_data)
|
||||
data = iface._readBytes(len(test_data))
|
||||
@@ -35,12 +35,39 @@ def test_StreamInterface_with_noProto(caplog, reset_globals):
|
||||
|
||||
|
||||
# Note: This takes a bit, so moving from unit to slow
|
||||
# Tip: If you want to see the print output, run with '-s' flag:
|
||||
# pytest -s meshtastic/tests/test_stream_interface.py::test_sendToRadioImpl
|
||||
@pytest.mark.unitslow
|
||||
def test_sendToRadioImpl(caplog, reset_globals):
|
||||
"""Test _sendToRadioImpl()"""
|
||||
|
||||
# def add_header(b):
|
||||
# """Add header stuffs for radio"""
|
||||
# bufLen = len(b)
|
||||
# header = bytes([START1, START2, (bufLen >> 8) & 0xff, bufLen & 0xff])
|
||||
# return header + b
|
||||
|
||||
# captured raw bytes of a Heltec2.1 radio with 2 channels (primary and a secondary channel named "gpio")
|
||||
raw_1_my_info = b'\x1a,\x08\xdc\x8c\xd5\xc5\x02\x18\r2\x0e1.2.49.5354c49P\x15]\xe1%\x17Eh\xe0\xa7\x12p\xe8\x9d\x01x\x08\x90\x01\x01'
|
||||
raw_2_node_info = b'"9\x08\xdc\x8c\xd5\xc5\x02\x12(\n\t!28b5465c\x12\x0cUnknown 465c\x1a\x03?5C"\x06$o(\xb5F\\0\n\x1a\x02 1%M<\xc6a'
|
||||
# pylint: disable=C0301
|
||||
raw_3_node_info = b'"C\x08\xa4\x8c\xd5\xc5\x02\x12(\n\t!28b54624\x12\x0cUnknown 4624\x1a\x03?24"\x06$o(\xb5F$0\n\x1a\x07 5MH<\xc6a%G<\xc6a=\x00\x00\xc0@'
|
||||
raw_4_complete = b'@\xcf\xe5\xd1\x8c\x0e'
|
||||
# pylint: disable=C0301
|
||||
raw_5_prefs = b'Z6\r\\F\xb5(\x15\\F\xb5("\x1c\x08\x06\x12\x13*\x11\n\x0f0\x84\x07P\xac\x02\x88\x01\x01\xb0\t#\xb8\t\x015]$\xddk5\xd5\x7f!b=M<\xc6aP\x03`F'
|
||||
# pylint: disable=C0301
|
||||
raw_6_channel0 = b'Z.\r\\F\xb5(\x15\\F\xb5("\x14\x08\x06\x12\x0b:\t\x12\x05\x18\x01"\x01\x01\x18\x015^$\xddk5\xd6\x7f!b=M<\xc6aP\x03`F'
|
||||
# pylint: disable=C0301
|
||||
raw_7_channel1 = b'ZS\r\\F\xb5(\x15\\F\xb5("9\x08\x06\x120:.\x08\x01\x12(" \xb4&\xb3\xc7\x06\xd8\xe39%\xba\xa5\xee\x8eH\x06\xf6\xf4H\xe8\xd5\xc1[ao\xb5Y\\\xb4"\xafmi*\x04gpio\x18\x025_$\xddk5\xd7\x7f!b=M<\xc6aP\x03`F'
|
||||
raw_8_channel2 = b'Z)\r\\F\xb5(\x15\\F\xb5("\x0f\x08\x06\x12\x06:\x04\x08\x02\x12\x005`$\xddk5\xd8\x7f!b=M<\xc6aP\x03`F'
|
||||
raw_blank = b''
|
||||
|
||||
test_data = b'hello'
|
||||
stream = MagicMock()
|
||||
stream.read.return_value = test_data
|
||||
#stream.read.return_value = add_header(test_data)
|
||||
stream.read.side_effect = [ raw_1_my_info, raw_2_node_info, raw_3_node_info, raw_4_complete,
|
||||
raw_5_prefs, raw_6_channel0, raw_7_channel1, raw_8_channel2,
|
||||
raw_blank, raw_blank]
|
||||
toRadio = MagicMock()
|
||||
toRadio.SerializeToString.return_value = test_data
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
@@ -51,3 +78,4 @@ def test_sendToRadioImpl(caplog, reset_globals):
|
||||
assert re.search(r'Sending: ', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'reading character', caplog.text, re.MULTILINE)
|
||||
assert re.search(r'In reader loop', caplog.text, re.MULTILINE)
|
||||
print(caplog.text)
|
||||
|
||||
Reference in New Issue
Block a user