refactor of Tunnel() for unit testing; create unit tests for Tunnel()

This commit is contained in:
Mike Kinney
2021-12-30 21:24:32 -08:00
parent 3f307880f9
commit d366e74e86
7 changed files with 205 additions and 122 deletions

View File

@@ -39,7 +39,10 @@ class StreamInterface(MeshInterface):
self._wantExit = False
# FIXME, figure out why daemon=True causes reader thread to exit too early
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
if noProto:
self._rxThread = None
else:
self._rxThread = threading.Thread(target=self.__reader, args=(), daemon=True)
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
@@ -65,7 +68,8 @@ class StreamInterface(MeshInterface):
if not self.noProto:
time.sleep(0.1) # wait 100ms to give device time to start running
self._rxThread.start()
if not self.noProto:
self._rxThread.start()
self._startConfig()
@@ -117,8 +121,9 @@ class StreamInterface(MeshInterface):
# pyserial cancel_read doesn't seem to work, therefore we ask the
# reader thread to close things for us
self._wantExit = True
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
if not self.noProto:
if self._rxThread != threading.current_thread():
self._rxThread.join() # wait for it to exit
def __reader(self):
"""The reader thread that reads bytes from our stream"""