Store much higher (time) res power readings any time we've just fetched

new readings.  This allows for better plotting/analysis but still keeping
runtime polling low.
This commit is contained in:
Kevin Hester
2024-07-10 16:44:27 -07:00
parent 0bc608d8cf
commit d0db5cae13
2 changed files with 46 additions and 32 deletions

2
.vscode/launch.json vendored
View File

@@ -204,7 +204,7 @@
"request": "launch",
"module": "meshtastic",
"justMyCode": false,
"args": ["--slog", "--power-ppk2-supply", "--power-stress", "--power-voltage", "3.3", "--seriallog", "--ble"]
"args": ["--slog", "--power-ppk2-supply", "--power-stress", "--power-voltage", "3.3", "--ble"]
},
{
"name": "meshtastic test",

View File

@@ -9,6 +9,7 @@ from ppk2_api import ppk2_api # type: ignore[import-untyped]
from .power_supply import PowerError, PowerSupply
class PPK2PowerSupply(PowerSupply):
"""Interface for talking with the NRF PPK2 high-resolution micro-power supply.
Power Profiler Kit II is what you should google to find it for purchase.
@@ -35,12 +36,18 @@ class PPK2PowerSupply(PowerSupply):
self.current_min = 0
self.current_sum = 0
self.current_num_samples = 0
self.current_average = 0
# for tracking avera data read length (to determine if we are sleeping efficiently in measurement_loop)
self.total_data_len = 0
self.num_data_reads = 0
self.max_data_len = 0
# Normally we just sleep with a timeout on this condition (polling the power measurement data repeatedly)
# but any time our measurements have been fully consumed (via reset_measurements) we notify() this condition
# to trigger a new reading ASAP.
self.want_measurement = threading.Condition()
self.r = r = ppk2_api.PPK2_API(
portName
) # serial port will be different for you
@@ -55,52 +62,57 @@ class PPK2PowerSupply(PowerSupply):
def measurement_loop(self):
"""Endless measurement loop will run in a thread."""
while self.measuring:
# always reads 4096 bytes, even if there is no new samples - or possibly the python single thread (because of global interpreter lock)
# is always behind and thefore we are inherently dropping samples semi randomly!!!
read_data = self.r.get_data()
if read_data != b"":
samples, _ = self.r.get_samples(read_data)
with self.want_measurement:
self.want_measurement.wait(0.0001 if self.num_data_reads == 0 else 0.01)
# normally we poll using this timeout, but sometimes
# reset_measurement() will notify us to read immediately
# update invariants
if len(samples) > 0:
if self.current_num_samples == 0:
self.current_min = samples[
0
] # we need at least one sample to get an initial min
self.current_max = max(self.current_max, max(samples))
self.current_min = min(self.current_min, min(samples))
self.current_sum += sum(samples)
self.current_num_samples += len(samples)
# logging.debug(f"PPK2 data_len={len(read_data)}, sample_len={len(samples)}")
# always reads 4096 bytes, even if there is no new samples - or possibly the python single thread (because of global interpreter lock)
# is always behind and thefore we are inherently dropping samples semi randomly!!!
read_data = self.r.get_data()
if read_data != b"":
samples, _ = self.r.get_samples(read_data)
self.num_data_reads += 1
self.total_data_len += len(read_data)
self.max_data_len = max(self.max_data_len, len(read_data))
# update invariants
if len(samples) > 0:
if (
self.current_num_samples == 0
): # First set of new reads, reset min/max
self.current_max = 0
self.current_min = samples[
0
] # we need at least one sample to get an initial min
self.current_max = max(self.current_max, max(samples))
self.current_min = min(self.current_min, min(samples))
self.current_sum += sum(samples)
self.current_num_samples += len(samples)
# logging.debug(f"PPK2 data_len={len(read_data)}, sample_len={len(samples)}")
time.sleep(0.01) # FIXME figure out correct sleep duration
self.num_data_reads += 1
self.total_data_len += len(read_data)
self.max_data_len = max(self.max_data_len, len(read_data))
def get_min_current_mA(self):
"""Returns max current in mA (since last call to this method)."""
"""Return the min current in mA."""
return self.current_min / 1000
def get_max_current_mA(self):
"""Returns max current in mA (since last call to this method)."""
"""Return the max current in mA."""
return self.current_max / 1000
def get_average_current_mA(self):
"""Returns average current in mA (since last call to this method)."""
if self.current_num_samples == 0:
return 0
else:
return (
self.current_sum / self.current_num_samples / 1000
) # measurements are in microamperes, divide by 1000
"""Return the average current in mA."""
if self.current_num_samples != 0:
# If we have new samples, calculate a new average
self.current_average = self.current_sum / self.current_num_samples
# Even if we don't have new samples, return the last calculated average
# measurements are in microamperes, divide by 1000
return self.current_average / 1000
def reset_measurements(self):
"""Reset current measurements."""
# Use the last reading as the new only reading (to ensure we always have a valid current reading)
self.current_max = 0
self.current_min = 0
self.current_sum = 0
self.current_num_samples = 0
@@ -110,6 +122,8 @@ class PPK2PowerSupply(PowerSupply):
self.num_data_reads = 0
self.total_data_len = 0
self.max_data_len = 0
with self.want_measurement:
self.want_measurement.notify() # notify the measurement loop to read immediately
def close(self) -> None:
"""Close the power meter."""