diff --git a/.vscode/launch.json b/.vscode/launch.json index 9b4dca4..3b82225 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -204,7 +204,7 @@ "request": "launch", "module": "meshtastic", "justMyCode": false, - "args": ["--slog-out", "default", "--power-ppk2-meter", "--power-stress", "--power-voltage", "3.3", "--seriallog"] + "args": ["--slog", "--power-ppk2-meter", "--power-stress", "--power-voltage", "3.3"] }, { "name": "meshtastic test", diff --git a/.vscode/settings.json b/.vscode/settings.json index 8fbd0a0..db434be 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,7 @@ "cSpell.words": [ "bitmask", "boardid", + "DEEPSLEEP", "Meshtastic", "milliwatt", "portnums", diff --git a/meshtastic/__main__.py b/meshtastic/__main__.py index 87d23cb..3873e44 100644 --- a/meshtastic/__main__.py +++ b/meshtastic/__main__.py @@ -852,14 +852,16 @@ def onConnected(interface): qr = pyqrcode.create(url) print(qr.terminal()) + log_set: Optional[LogSet] = None # we need to keep a reference to the logset so it doesn't get GCed early if args.slog or args.power_stress: # Setup loggers global meter # pylint: disable=global-variable-not-assigned - LogSet(interface, args.slog if args.slog != 'default' else None, meter) + log_set = LogSet(interface, args.slog if args.slog != 'default' else None, meter) if args.power_stress: stress = PowerStress(interface) stress.run() + closeNow = True # exit immediately after stress test if args.listen: closeNow = False @@ -895,6 +897,8 @@ def onConnected(interface): # if the user didn't ask for serial debugging output, we might want to exit after we've done our operation if (not args.seriallog) and closeNow: interface.close() # after running command then exit + if log_set: + log_set.close() except Exception as ex: print(f"Aborting due to: {ex}") diff --git a/meshtastic/powermon/stress.py b/meshtastic/powermon/stress.py index 665e58b..2a07c63 100644 --- a/meshtastic/powermon/stress.py +++ b/meshtastic/powermon/stress.py @@ -3,7 +3,7 @@ import logging import time -from ..protobuf import ( portnums_pb2, powermon_pb2 ) +from ..protobuf import portnums_pb2, powermon_pb2 def onPowerStressResponse(packet, interface): @@ -53,6 +53,32 @@ class PowerStressClient: onResponseAckPermitted=True, ) + def syncPowerStress( + self, + cmd: powermon_pb2.PowerStressMessage.Opcode.ValueType, + num_seconds: float = 0.0, + ): + """Send a power stress command and wait for the ack.""" + gotAck = False + + def onResponse(packet: dict): # pylint: disable=unused-argument + nonlocal gotAck + gotAck = True + + logging.info(f"Sending power stress command {powermon_pb2.PowerStressMessage.Opcode.Name(cmd)}") + self.sendPowerStress(cmd, onResponse=onResponse, num_seconds=num_seconds) + + if num_seconds == 0.0: + # Wait for the response and then continue + while not gotAck: + time.sleep(0.1) + else: + # we wait a little bit longer than the time the UUT would be waiting (to make sure all of its messages are handled first) + time.sleep(num_seconds + 0.2) # completely block our thread for the duration of the test + if not gotAck: + logging.error("Did not receive ack for power stress command!") + + class PowerStress: """Walk the UUT through a set of power states so we can capture repeatable power consumption measurements.""" @@ -63,19 +89,21 @@ class PowerStress: def run(self): """Run the power stress test.""" # Send the power stress command - gotAck = False - def onResponse(packet: dict): # pylint: disable=unused-argument - nonlocal gotAck - gotAck = True + self.client.syncPowerStress(powermon_pb2.PowerStressMessage.PRINT_INFO) - logging.info("Starting power stress test, attempting to contact UUT...") - self.client.sendPowerStress( - powermon_pb2.PowerStressMessage.PRINT_INFO, onResponse=onResponse - ) + num_seconds = 5.0 + states = [ + powermon_pb2.PowerStressMessage.LED_ON, + powermon_pb2.PowerStressMessage.BT_OFF, + powermon_pb2.PowerStressMessage.BT_ON, + powermon_pb2.PowerStressMessage.CPU_FULLON, + powermon_pb2.PowerStressMessage.CPU_IDLE, + powermon_pb2.PowerStressMessage.CPU_DEEPSLEEP, + ] + for s in states: + s_name = powermon_pb2.PowerStressMessage.Opcode.Name(s) + logging.info(f"Running power stress test {s_name} for {num_seconds} seconds") + self.client.syncPowerStress(s, num_seconds) - # Wait for the response - while not gotAck: - time.sleep(0.1) - - logging.info("Power stress test complete.") + logging.info("Power stress test complete.") \ No newline at end of file