mirror of
https://github.com/mudita/MuditaOS.git
synced 2026-05-19 14:15:02 -04:00
This PR: * provides Bluetooth-Harness API messages definition [EGD-5944] * provides Bluetooth-Harness API messages implementation * provides Bluetooth-Harness API usage on harness side [EGD-5946] * provides Bluetooth-Harness tests [EGD-5947]
96 lines
3.6 KiB
Python
96 lines
3.6 KiB
Python
# Copyright (c) 2017-2021, Mudita Sp. z.o.o. All rights reserved.
|
|
# For licensing, see https://github.com/mudita/MuditaOS/LICENSE.md
|
|
|
|
from harness.interface.defs import status
|
|
from harness import log
|
|
import time
|
|
|
|
def bt_get_state(harness):
|
|
time.sleep(1)
|
|
body = {"state": True}
|
|
ret = harness.endpoint_request("bluetooth", "get", body)
|
|
assert ret["status"] == status["OK"]
|
|
return ret["body"]["state"]
|
|
|
|
def bt_set_status(harness, power, visibility = "off"):
|
|
log.info("Turning Bluetooth {} with visibility {}...".format(power, visibility))
|
|
body = {"state": {"power" : power , "visibility" : visibility }}
|
|
ret = harness.endpoint_request("bluetooth", "put", body)
|
|
assert ret["status"] == status["OK"]
|
|
|
|
def bt_command(harness, command, http_method = "put"):
|
|
log.info("Sending command: {} with http_method={}".format(command, http_method))
|
|
body = {"command": command}
|
|
ret = harness.endpoint_request("bluetooth", http_method, body)
|
|
assert ret["status"] == status["OK"]
|
|
|
|
def bt_pair_command(harness, pair_command, address, http_method):
|
|
log.info("Requesting {}ing with address={}...".format(pair_command, address))
|
|
body = {pair_command: address}
|
|
ret = harness.endpoint_request("bluetooth", http_method, body)
|
|
assert ret["status"] == status["OK"]
|
|
|
|
def bt_get_device_by_name(devices, name) -> dict :
|
|
for device in devices:
|
|
if device["name"] == name :
|
|
return device
|
|
return {}
|
|
|
|
def bt_find_device(harness, device_origin, device_name, max_attempts = 7):
|
|
log.info("Getting {} devices".format(device_origin))
|
|
body = {"devices": device_origin}
|
|
|
|
for i in range(max_attempts):
|
|
ret = harness.endpoint_request("bluetooth", "get", body)
|
|
device = bt_get_device_by_name(ret["body"]["devices"], device_name)
|
|
if device:
|
|
log.info("Found {}, address={}".format(device_name, device.get('address')))
|
|
return device
|
|
if i != max_attempts - 1:
|
|
log.info("Device {} not found, retrying...".format(device_name))
|
|
time.sleep(2)
|
|
return {}
|
|
|
|
def bt_is_device_forgotten(harness, device_name, max_attempts = 7):
|
|
log.info("Checking if pair forgetting succeeded...")
|
|
body = {"devices": "bonded"}
|
|
|
|
for i in range(max_attempts):
|
|
ret = harness.endpoint_request("bluetooth", "get", body)
|
|
device = bt_get_device_by_name(ret["body"]["devices"], device_name)
|
|
if not device:
|
|
return True
|
|
if i != max_attempts - 1:
|
|
log.info("Device {} still paired, retrying...".format(device_name))
|
|
time.sleep(2)
|
|
return False
|
|
|
|
def bt_get_connected_address(harness, max_attempts = 10):
|
|
log.info("Getting address of connected device")
|
|
body = {"devices": "bonded"}
|
|
|
|
for i in range(max_attempts):
|
|
ret = harness.endpoint_request("bluetooth", "get", body)
|
|
address = ret["body"]["address"]
|
|
if len(address) > 0:
|
|
log.info("Device connected={}".format(address))
|
|
return address
|
|
if i != max_attempts - 1:
|
|
log.info("No device connected, retrying...")
|
|
time.sleep(2)
|
|
return ""
|
|
|
|
def bt_is_device_disconnected(harness, max_attempts = 7):
|
|
log.info("Checking if disconnecting succeeded...")
|
|
body = {"devices": "bonded"}
|
|
|
|
for i in range(max_attempts):
|
|
ret = harness.endpoint_request("bluetooth", "get", body)
|
|
address = ret["body"]["address"]
|
|
if len(address) == 0:
|
|
return True
|
|
if i != max_attempts - 1:
|
|
log.info("Device {} connected, retrying...".format(address))
|
|
time.sleep(2)
|
|
return False
|