initial test_ble_interface; add another unit test

This commit is contained in:
Mike Kinney
2021-12-15 16:15:46 -08:00
parent 182a9b89f3
commit ef9a97d1f7
6 changed files with 69 additions and 43 deletions

View File

@@ -575,7 +575,7 @@ def common():
subscribe()
if args.ble:
client = BLEInterface(args.ble, debugOut=logfile)
client = BLEInterface(args.ble, debugOut=logfile, noProto=args.noproto)
elif args.host:
client = TCPInterface(
args.host, debugOut=logfile, noProto=args.noproto)

View File

@@ -15,22 +15,27 @@ FROMNUM_UUID = "ed9da18c-a800-4f66-a670-aa7547e34453"
class BLEInterface(MeshInterface):
"""A not quite ready - FIXME - BLE interface to devices"""
def __init__(self, address, debugOut=None):
def __init__(self, address, noProto=False, debugOut=None):
self.address = address
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f"Connecting to {self.address}")
self.device = self.adapter.connect(address)
if not noProto:
self.adapter = pygatt.GATTToolBackend() # BGAPIBackend()
self.adapter.start()
logging.debug(f"Connecting to {self.address}")
self.device = self.adapter.connect(address)
else:
self.adapter = None
self.device = None
logging.debug("Connected to device")
# fromradio = self.device.char_read(FROMRADIO_UUID)
MeshInterface.__init__(self, debugOut=debugOut)
MeshInterface.__init__(self, debugOut=debugOut, noProto=noProto)
self._readFromRadio() # read the initial responses
def handle_data(handle, data):
self._handleFromRadio(data)
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
if self.device:
self.device.subscribe(FROMNUM_UUID, callback=handle_data)
def _sendToRadioImpl(self, toRadio):
"""Send a ToRadio protobuf to the device"""
@@ -40,12 +45,15 @@ class BLEInterface(MeshInterface):
def close(self):
MeshInterface.close(self)
self.adapter.stop()
if self.adapter:
self.adapter.stop()
def _readFromRadio(self):
wasEmpty = False
while not wasEmpty:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)
if not self.noProto:
wasEmpty = False
while not wasEmpty:
if self.device:
b = self.device.char_read(FROMRADIO_UUID)
wasEmpty = len(b) == 0
if not wasEmpty:
self._handleFromRadio(b)

View File

@@ -101,39 +101,38 @@ class MeshInterface:
return timeago.format(datetime.fromtimestamp(ts), datetime.now()) if ts else None
rows = []
for node in self.nodes.values():
if not includeSelf and node['num'] == self.localNode.nodeNum:
continue
if self.nodes:
for node in self.nodes.values():
if not includeSelf and node['num'] == self.localNode.nodeNum:
continue
row = {"N": 0}
row = {"N": 0}
user = node.get('user')
if user:
row.update({
"User": user['longName'],
"AKA": user['shortName'],
"ID": user['id'],
})
pos = node.get('position')
if pos:
row.update({
"Latitude": formatFloat(pos.get("latitude"), 4, "°"),
"Longitude": formatFloat(pos.get("longitude"), 4, "°"),
"Altitude": formatFloat(pos.get("altitude"), 0, " m"),
"Battery": formatFloat(pos.get("batteryLevel"), 2, "%"),
})
user = node.get('user')
if user:
row.update({
"User": user['longName'],
"AKA": user['shortName'],
"ID": user['id'],
"SNR": formatFloat(node.get("snr"), 2, " dB"),
"LastHeard": getLH(node.get("lastHeard")),
"Since": getTimeAgo(node.get("lastHeard")),
})
pos = node.get('position')
if pos:
row.update({
"Latitude": formatFloat(pos.get("latitude"), 4, "°"),
"Longitude": formatFloat(pos.get("longitude"), 4, "°"),
"Altitude": formatFloat(pos.get("altitude"), 0, " m"),
"Battery": formatFloat(pos.get("batteryLevel"), 2, "%"),
})
rows.append(row)
row.update({
"SNR": formatFloat(node.get("snr"), 2, " dB"),
"LastHeard": getLH(node.get("lastHeard")),
"Since": getTimeAgo(node.get("lastHeard")),
})
rows.append(row)
# Why doesn't this way work?
#rows.sort(key=lambda r: r.get('LastHeard', '0000'), reverse=True)
rows.sort(key=lambda r: r.get('LastHeard') or '0000', reverse=True)
for i, row in enumerate(rows):
row['N'] = i+1

View File

@@ -0,0 +1,12 @@
"""Meshtastic unit tests for ble_interface.py"""
import pytest
from ..ble_interface import BLEInterface
@pytest.mark.unit
def test_BLEInterface():
"""Test that we can instantiate a BLEInterface"""
iface = BLEInterface('foo', debugOut=True, noProto=True)
iface.close()

View File

@@ -13,6 +13,8 @@ def test_MeshInterface(capsys):
iface = MeshInterface(noProto=True)
iface.showInfo()
iface.localNode.showInfo()
iface.showNodes()
iface.close()
out, err = capsys.readouterr()
assert re.search(r'Owner: None \(None\)', out, re.MULTILINE)
assert re.search(r'Nodes', out, re.MULTILINE)
@@ -20,4 +22,3 @@ def test_MeshInterface(capsys):
assert re.search(r'Channels', out, re.MULTILINE)
assert re.search(r'Primary channel URL', out, re.MULTILINE)
assert err == ''
iface.close()

View File

@@ -84,6 +84,12 @@ def test_pskToString_many_bytes():
assert pskToString(bytes([0x02, 0x01])) == 'secret'
@pytest.mark.unit
def test_pskToString_simple():
"""Test pskToString simple"""
assert pskToString(bytes([0x03])) == 'simple2'
@pytest.mark.unit
def test_our_exit_zero_return_value():
"""Test our_exit with a zero return value"""