mirror of
https://github.com/meshtastic/python.git
synced 2025-12-28 18:37:52 -05:00
183 lines
7.5 KiB
Python
183 lines
7.5 KiB
Python
"""Classes for logging power consumption of meshtastic devices."""
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Optional
|
|
|
|
from ppk2_api import ppk2_api # type: ignore[import-untyped]
|
|
|
|
from .power_supply import PowerError, PowerSupply
|
|
|
|
|
|
class PPK2PowerSupply(PowerSupply):
|
|
"""Interface for talking with the NRF PPK2 high-resolution micro-power supply.
|
|
Power Profiler Kit II is what you should google to find it for purchase.
|
|
"""
|
|
|
|
def __init__(self, portName: Optional[str] = None):
|
|
"""Initialize the PowerSupply object.
|
|
|
|
portName (str, optional): The port name of the power supply. Defaults to "/dev/ttyACM0".
|
|
"""
|
|
if not portName:
|
|
devs = ppk2_api.PPK2_API.list_devices()
|
|
if not devs or len(devs) == 0:
|
|
raise PowerError("No PPK2 devices found")
|
|
elif len(devs) > 1:
|
|
raise PowerError(
|
|
"Multiple PPK2 devices found, please specify the portName"
|
|
)
|
|
else:
|
|
portName = devs[0]
|
|
|
|
self.measuring = False
|
|
self.current_max = 0
|
|
self.current_min = 0
|
|
self.current_sum = 0
|
|
self.current_num_samples = 0
|
|
self.current_average = 0
|
|
|
|
# for tracking avera data read length (to determine if we are sleeping efficiently in measurement_loop)
|
|
self.total_data_len = 0
|
|
self.num_data_reads = 0
|
|
self.max_data_len = 0
|
|
|
|
# Normally we just sleep with a timeout on this condition (polling the power measurement data repeatedly)
|
|
# but any time our measurements have been fully consumed (via reset_measurements) we notify() this condition
|
|
# to trigger a new reading ASAP.
|
|
self._want_measurement = threading.Condition()
|
|
|
|
# To guard against a brief window while updating measured values
|
|
self._result_lock = threading.Condition()
|
|
|
|
self.r = r = ppk2_api.PPK2_API(
|
|
portName
|
|
) # serial port will be different for you
|
|
r.get_modifiers()
|
|
|
|
self.measurement_thread = threading.Thread(
|
|
target=self.measurement_loop, daemon=True, name="ppk2 measurement"
|
|
)
|
|
logging.info("Connected to Power Profiler Kit II (PPK2)")
|
|
super().__init__() # we call this late so that the port is already open and _getRawWattHour callback works
|
|
|
|
def measurement_loop(self):
|
|
"""Endless measurement loop will run in a thread."""
|
|
while self.measuring:
|
|
with self._want_measurement:
|
|
self._want_measurement.wait(
|
|
0.0001 if self.num_data_reads == 0 else 0.001
|
|
)
|
|
# normally we poll using this timeout, but sometimes
|
|
# reset_measurement() will notify us to read immediately
|
|
|
|
# always reads 4096 bytes, even if there is no new samples - or possibly the python single thread (because of global interpreter lock)
|
|
# is always behind and thefore we are inherently dropping samples semi randomly!!!
|
|
read_data = self.r.get_data()
|
|
if read_data != b"":
|
|
samples, _ = self.r.get_samples(read_data)
|
|
|
|
# update invariants
|
|
if len(samples) > 0:
|
|
if self.current_num_samples == 0:
|
|
# First set of new reads, reset min/max
|
|
self.current_max = 0
|
|
self.current_min = samples[0]
|
|
# we need at least one sample to get an initial min
|
|
|
|
# The following operations could be expensive, so do outside of the lock
|
|
# FIXME - change all these lists into numpy arrays to use lots less CPU
|
|
self.current_max = max(self.current_max, max(samples))
|
|
self.current_min = min(self.current_min, min(samples))
|
|
latest_sum = sum(samples)
|
|
with self._result_lock:
|
|
self.current_sum += latest_sum
|
|
self.current_num_samples += len(samples)
|
|
# logging.debug(f"PPK2 data_len={len(read_data)}, sample_len={len(samples)}")
|
|
|
|
self.num_data_reads += 1
|
|
self.total_data_len += len(read_data)
|
|
self.max_data_len = max(self.max_data_len, len(read_data))
|
|
|
|
def get_min_current_mA(self):
|
|
"""Return the min current in mA."""
|
|
return self.current_min / 1000
|
|
|
|
def get_max_current_mA(self):
|
|
"""Return the max current in mA."""
|
|
return self.current_max / 1000
|
|
|
|
def get_average_current_mA(self):
|
|
"""Return the average current in mA."""
|
|
with self._result_lock:
|
|
if self.current_num_samples != 0:
|
|
# If we have new samples, calculate a new average
|
|
self.current_average = self.current_sum / self.current_num_samples
|
|
|
|
# Even if we don't have new samples, return the last calculated average
|
|
# measurements are in microamperes, divide by 1000
|
|
return self.current_average / 1000
|
|
|
|
def reset_measurements(self):
|
|
"""Reset current measurements."""
|
|
# Use the last reading as the new only reading (to ensure we always have a valid current reading)
|
|
self.current_sum = 0
|
|
self.current_num_samples = 0
|
|
|
|
# if self.num_data_reads:
|
|
# logging.debug(f"max data len = {self.max_data_len},avg {self.total_data_len/self.num_data_reads}, num reads={self.num_data_reads}")
|
|
# Summary stats for performance monitoring
|
|
self.num_data_reads = 0
|
|
self.total_data_len = 0
|
|
self.max_data_len = 0
|
|
|
|
with self._want_measurement:
|
|
self._want_measurement.notify() # notify the measurement loop to read immediately
|
|
|
|
def close(self) -> None:
|
|
"""Close the power meter."""
|
|
self.measuring = False
|
|
self.r.stop_measuring() # send command to ppk2
|
|
self.measurement_thread.join() # wait for our thread to finish
|
|
super().close()
|
|
|
|
def setIsSupply(self, is_supply: bool):
|
|
"""If in supply mode we will provide power ourself, otherwise we are just an amp meter."""
|
|
|
|
assert self.v > 0.8 # We must set a valid voltage before calling this method
|
|
|
|
self.r.set_source_voltage(
|
|
int(self.v * 1000)
|
|
) # set source voltage in mV BEFORE setting source mode
|
|
# Note: source voltage must be set even if we are using the amp meter mode
|
|
|
|
# must be after setting source voltage and before setting mode
|
|
self.r.start_measuring() # send command to ppk2
|
|
|
|
if (
|
|
not is_supply
|
|
): # min power outpuf of PPK2. If less than this assume we want just meter mode.
|
|
self.r.use_ampere_meter()
|
|
else:
|
|
self.r.use_source_meter() # set source meter mode
|
|
|
|
if not self.measurement_thread.is_alive():
|
|
self.measuring = True
|
|
self.reset_measurements()
|
|
|
|
# We can't start reading from the thread until vdd is set, so start running the thread now
|
|
self.measurement_thread.start()
|
|
time.sleep(
|
|
0.2
|
|
) # FIXME - crufty way to ensure we do one set of reads to discard bogus fake power readings in the FIFO
|
|
self.reset_measurements()
|
|
|
|
def powerOn(self):
|
|
"""Power on the supply."""
|
|
self.r.toggle_DUT_power("ON")
|
|
|
|
def powerOff(self):
|
|
"""Power off the supply."""
|
|
self.r.toggle_DUT_power("OFF")
|