Convert test_accum, test_aggregate, and test_series

To use pytest
This commit is contained in:
Tom Keffer
2025-12-30 13:10:26 -08:00
parent 1847cb0563
commit bdcda0ee55
3 changed files with 480 additions and 503 deletions

View File

@@ -6,11 +6,11 @@
"""Test module weewx.accum"""
import math
import time
import unittest
import pytest
import gen_fake_data
import weewx.accum
from gen_fake_data import genFakeRecords
from gen_fake_data import gen_fake_records
from weeutil.weeutil import TimeSpan
# 30 minutes worth of data:
@@ -18,20 +18,21 @@ start_ts = int(time.mktime((2009, 1, 1, 0, 0, 0, 0, 0, -1)))
stop_ts = int(time.mktime((2009, 1, 1, 0, 30, 0, 0, 0, -1)))
class StatsTest(unittest.TestCase):
class TestStats:
def setUp(self):
@pytest.fixture(autouse=True)
def setup(self):
# The data set is a list of faked records at 5 second intervals
self.dataset = list(genFakeRecords(start_ts=start_ts + 5, stop_ts=stop_ts, interval=5))
self.dataset = list(gen_fake_records(start_ts=start_ts + 5, stop_ts=stop_ts, interval=5))
def test_scalarStats(self):
ss = weewx.accum.ScalarStats()
# Make sure the default values work:
self.assertEqual(ss.min, None)
self.assertEqual(ss.last, None)
assert ss.min is None
assert ss.last is None
tmin = tmintime = None
tsum = tcount = 0
@@ -57,29 +58,29 @@ class StatsTest(unittest.TestCase):
# Some of these tests look for "almost equal", because of the rounding errors introduced by
# conversion to a string and back
self.assertAlmostEqual(ss.min, tmin, 6)
self.assertEqual(ss.mintime, tmintime)
assert ss.min == pytest.approx(tmin, abs=1e-6)
assert ss.mintime == tmintime
# Assumes the last data point is not None:
self.assertAlmostEqual(ss.last, self.dataset[-1]['outTemp'], 6)
self.assertEqual(ss.lasttime, self.dataset[-1]['dateTime'])
assert ss.last == pytest.approx(self.dataset[-1]['outTemp'], abs=1e-6)
assert ss.lasttime == self.dataset[-1]['dateTime']
self.assertAlmostEqual(ss.sum, tsum, 6)
self.assertEqual(ss.count, tcount)
self.assertAlmostEqual(ss.avg, tsum / tcount, 6)
assert ss.sum == pytest.approx(tsum, abs=1e-6)
assert ss.count == tcount
assert ss.avg == pytest.approx(tsum / tcount, abs=1e-6)
# Merge ss into its self. Should leave highs and lows unchanged, but double counts:
ss.mergeHiLo(ss)
ss.mergeSum(ss)
self.assertAlmostEqual(ss.min, tmin, 6)
self.assertEqual(ss.mintime, tmintime)
assert ss.min == pytest.approx(tmin, abs=1e-6)
assert ss.mintime == tmintime
self.assertAlmostEqual(ss.last, self.dataset[-1]['outTemp'], 6)
self.assertEqual(ss.lasttime, self.dataset[-1]['dateTime'])
assert ss.last == pytest.approx(self.dataset[-1]['outTemp'], abs=1e-6)
assert ss.lasttime == self.dataset[-1]['dateTime']
self.assertAlmostEqual(ss.sum, 2 * tsum, 6)
self.assertEqual(ss.count, 2 * tcount)
assert ss.sum == pytest.approx(2 * tsum, abs=1e-6)
assert ss.count == 2 * tcount
def test_null_wind_gust_dir(self):
# If LOOP packets windGustDir=None, the accumulator should not substitute windDir.
@@ -95,7 +96,7 @@ class StatsTest(unittest.TestCase):
# Extract the record out of the accumulator
accum_record = accum.getRecord()
# windGustDir should match the windDir seen at max wind:
self.assertIsNone(accum_record['windGustDir'])
assert accum_record['windGustDir'] is None
def test_no_wind_gust_dir(self):
# If LOOP packets do not have windGustDir at all, then the accumulator is supposed to
@@ -118,7 +119,7 @@ class StatsTest(unittest.TestCase):
# Extract the record out of the accumulator
accum_record = accum.getRecord()
# windGustDir should match the windDir seen at max wind:
self.assertEqual(accum_record['windGustDir'], windMaxDir)
assert accum_record['windGustDir'] == windMaxDir
def test_issue_737(self):
accum = weewx.accum.Accum(TimeSpan(start_ts, stop_ts))
@@ -127,16 +128,19 @@ class StatsTest(unittest.TestCase):
accum.addRecord(packet)
# Extract the record out of the accumulator
record = accum.getRecord()
self.assertIsNone(record['windrun'])
assert record['windrun'] is None
class AccumTest(unittest.TestCase):
class TestAccum:
def setUp(self):
@pytest.fixture(autouse=True)
def setup(self):
# The data set is a list of faked records at 5 second intervals. The stage of the weather cycle
# is set so that some rain will appear.
self.dataset = list(genFakeRecords(start_ts=start_ts + 5, stop_ts=stop_ts, interval=5,
weather_phase_offset=gen_fake_data.weather_cycle * math.pi / 2.0))
weather_cycle = 3600 * 24.0 * 4
self.dataset = list(gen_fake_records(start_ts=start_ts + 5, stop_ts=stop_ts, interval=5,
weather_cycle=weather_cycle,
weather_phase_offset=weather_cycle * math.pi / 2.0))
def test_Accum_getRecord(self):
"""Test extraction of record from an accumulator."""
@@ -145,8 +149,8 @@ class AccumTest(unittest.TestCase):
accum.addRecord(record)
extracted = accum.getRecord()
self.assertEqual(extracted['dateTime'], self.dataset[-1]['dateTime'])
self.assertEqual(extracted['usUnits'], weewx.US)
assert extracted['dateTime'] == self.dataset[-1]['dateTime']
assert extracted['usUnits'] == weewx.US
sum_t = 0
count_t = 0
@@ -154,7 +158,7 @@ class AccumTest(unittest.TestCase):
if rec['outTemp'] is not None:
sum_t += rec['outTemp']
count_t += 1
self.assertEqual(extracted['outTemp'], sum_t / count_t)
assert extracted['outTemp'] == sum_t / count_t
max_wind = 0
max_dir = None
@@ -162,14 +166,14 @@ class AccumTest(unittest.TestCase):
if rec['windGust'] is not None and rec['windGust'] > max_wind:
max_wind = rec['windGust']
max_dir = rec['windGustDir']
self.assertEqual(extracted['windGust'], max_wind)
self.assertEqual(extracted['windGustDir'], max_dir)
assert extracted['windGust'] == max_wind
assert extracted['windGustDir'] == max_dir
rain_sum = 0
for rec in self.dataset:
if rec['rain'] is not None:
rain_sum += rec['rain']
self.assertEqual(extracted['rain'], rain_sum)
assert extracted['rain'] == rain_sum
def test_Accum_with_string(self):
"""Test records with string literals in them."""
@@ -189,18 +193,15 @@ class AccumTest(unittest.TestCase):
accum.addRecord(record)
# The value extracted for the string should be the last one seen
rec = accum.getRecord()
self.assertEqual(rec['stringType'], "AString%d" % (len(self.dataset) - 1))
assert rec['stringType'] == "AString%d" % (len(self.dataset) - 1)
def test_Accum_unit_change(self):
# Change the units used by a record mid-stream
self.dataset[5]['usUnits'] = weewx.METRICWX
# This should result in a ValueError
with self.assertRaises(ValueError):
with pytest.raises(ValueError):
accum = weewx.accum.Accum(TimeSpan(start_ts, stop_ts))
for record in self.dataset:
accum.addRecord(record)
if __name__ == '__main__':
unittest.main()

View File

@@ -5,365 +5,330 @@
#
"""Test aggregate functions."""
import logging
import math
import os.path
import sys
import time
import unittest
import configobj
import pytest
import weedb
import gen_fake_data
import weeutil.logger
import weewx
import weewx.manager
import weewx.xtypes
from parameters import synthetic_dict
from weeutil.weeutil import TimeSpan
from weewx.units import ValueTuple
os.environ['TZ'] = 'America/Los_Angeles'
time.tzset()
weewx.debug = 1
log = logging.getLogger(__name__)
# Set up logging using the defaults.
weeutil.logger.setup('weetest_aggregate')
# Find the configuration file. It's assumed to be in the same directory as me:
config_path = os.path.join(os.path.dirname(__file__), "testgen.conf")
def test_get_aggregate(config_dict):
# Use the same function to test calculating aggregations from the main archive file, as
# well as from the daily summaries:
examine_object(config_dict, weewx.xtypes.ArchiveTable)
examine_object(config_dict, weewx.xtypes.DailySummaries)
class TestAggregate(unittest.TestCase):
def examine_object(weewx_dict, aggregate_obj):
with weewx.manager.open_manager_with_config(weewx_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
def setUp(self):
global config_path
avg_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts), 'avg',
db_manager)
assert avg_vt[0] == pytest.approx(28.77, abs=0.01)
assert avg_vt[1] == 'degree_F'
assert avg_vt[2] == 'group_temperature'
try:
self.config_dict = configobj.ConfigObj(config_path, file_error=True, encoding='utf-8')
except IOError:
sys.stderr.write("Unable to open configuration file %s" % config_path)
# Reraise the exception (this will eventually cause the program to exit)
raise
except configobj.ConfigObjError:
sys.stderr.write("Error while parsing configuration file %s" % config_path)
raise
max_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'max', db_manager)
assert max_vt[0] == pytest.approx(58.88, abs=0.01)
maxtime_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'maxtime', db_manager)
assert maxtime_vt[0] == 1270076400
# This will generate the test databases if necessary. Use the SQLite database: it's faster.
gen_fake_data.configDatabases(self.config_dict, database_type='sqlite')
min_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'min', db_manager)
assert min_vt[0] == pytest.approx(-1.01, abs=0.01)
mintime_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'mintime', db_manager)
assert mintime_vt[0] == 1267441200
def tearDown(self):
pass
count_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'count', db_manager)
assert count_vt[0] == 1465
def test_get_aggregate(self):
# Use the same function to test calculating aggregations from the main archive file, as
# well as from the daily summaries:
self.examine_object(weewx.xtypes.ArchiveTable)
self.examine_object(weewx.xtypes.DailySummaries)
sum_vt = aggregate_obj.get_aggregate('rain', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
assert sum_vt[0] == pytest.approx(10.24, abs=0.01)
def examine_object(self, aggregate_obj):
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
avg_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts), 'avg',
db_manager)
self.assertAlmostEqual(avg_vt[0], 28.77, 2)
self.assertEqual(avg_vt[1], 'degree_F')
self.assertEqual(avg_vt[2], 'group_temperature')
max_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'max', db_manager)
self.assertAlmostEqual(max_vt[0], 58.88, 2)
maxtime_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'maxtime', db_manager)
self.assertEqual(maxtime_vt[0], 1270076400)
min_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'min', db_manager)
self.assertAlmostEqual(min_vt[0], -1.01, 2)
mintime_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'mintime', db_manager)
self.assertEqual(mintime_vt[0], 1267441200)
count_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'count', db_manager)
self.assertEqual(count_vt[0], 1465)
sum_vt = aggregate_obj.get_aggregate('rain', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
self.assertAlmostEqual(sum_vt[0], 10.24, 2)
not_null_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'not_null', db_manager)
self.assertTrue(not_null_vt[0])
self.assertEqual(not_null_vt[1], 'boolean')
self.assertEqual(not_null_vt[2], 'group_boolean')
null_vt = aggregate_obj.get_aggregate('inTemp', TimeSpan(start_ts, stop_ts),
not_null_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'not_null', db_manager)
self.assertFalse(null_vt[0])
assert not_null_vt[0]
assert not_null_vt[1] == 'boolean'
assert not_null_vt[2] == 'group_boolean'
# Values for inTemp in the test database are null for early May, but not null for later
# in the month. So, for all of May, the aggregate 'not_null' should be True.
null_start_ts = time.mktime((2010, 5, 1, 0, 0, 0, 0, 0, -1))
null_stop_ts = time.mktime((2010, 6, 1, 0, 0, 0, 0, 0, -1))
null_vt = aggregate_obj.get_aggregate('inTemp', TimeSpan(null_start_ts, null_stop_ts),
'not_null', db_manager)
self.assertTrue(null_vt[0])
null_vt = aggregate_obj.get_aggregate('inTemp', TimeSpan(start_ts, stop_ts),
'not_null', db_manager)
assert not null_vt[0]
# The ArchiveTable version has a few extra aggregate types:
if aggregate_obj == weewx.xtypes.ArchiveTable:
first_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'first', db_manager)
# Get the timestamp of the first record inside the month
ts = start_ts + gen_fake_data.interval
rec = db_manager.getRecord(ts)
self.assertEqual(first_vt[0], rec['outTemp'])
# Values for inTemp in the test database are null for early May, but not null for later
# in the month. So, for all of May, the aggregate 'not_null' should be True.
null_start_ts = time.mktime((2010, 5, 1, 0, 0, 0, 0, 0, -1))
null_stop_ts = time.mktime((2010, 6, 1, 0, 0, 0, 0, 0, -1))
null_vt = aggregate_obj.get_aggregate('inTemp', TimeSpan(null_start_ts, null_stop_ts),
'not_null', db_manager)
assert null_vt[0]
first_time_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'firsttime',
db_manager)
self.assertEqual(first_time_vt[0], ts)
# The ArchiveTable version has a few extra aggregate types:
if aggregate_obj == weewx.xtypes.ArchiveTable:
first_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'first', db_manager)
# Get the timestamp of the first record inside the month
ts = start_ts + synthetic_dict['interval']
rec = db_manager.getRecord(ts)
assert first_vt[0] == rec['outTemp']
last_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'last', db_manager)
# Get the timestamp of the last record of the month
rec = db_manager.getRecord(stop_ts)
self.assertEqual(last_vt[0], rec['outTemp'])
first_time_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'firsttime',
db_manager)
assert first_time_vt[0] == ts
last_time_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'lasttime', db_manager)
self.assertEqual(last_time_vt[0], stop_ts)
last_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'last', db_manager)
# Get the timestamp of the last record of the month
rec = db_manager.getRecord(stop_ts)
assert last_vt[0] == rec['outTemp']
# Use 'dateTime' to check 'diff' and 'tderiv'. The calculations are super easy.
diff_vt = aggregate_obj.get_aggregate('dateTime', TimeSpan(start_ts, stop_ts),
'diff', db_manager)
self.assertEqual(diff_vt[0], stop_ts - start_ts)
last_time_vt = aggregate_obj.get_aggregate('outTemp', TimeSpan(start_ts, stop_ts),
'lasttime', db_manager)
assert last_time_vt[0] == stop_ts
tderiv_vt = aggregate_obj.get_aggregate('dateTime', TimeSpan(start_ts, stop_ts),
'tderiv', db_manager)
self.assertAlmostEqual(tderiv_vt[0], 1.0)
# Use 'dateTime' to check 'diff' and 'tderiv'. The calculations are super easy.
diff_vt = aggregate_obj.get_aggregate('dateTime', TimeSpan(start_ts, stop_ts),
'diff', db_manager)
assert diff_vt[0] == stop_ts - start_ts
def test_AggregateDaily(self):
"""Test special aggregates that can be used against the daily summaries."""
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
tderiv_vt = aggregate_obj.get_aggregate('dateTime', TimeSpan(start_ts, stop_ts),
'tderiv', db_manager)
assert tderiv_vt[0] == pytest.approx(1.0)
min_ge_vt = weewx.xtypes.DailySummaries.get_aggregate('outTemp',
TimeSpan(start_ts, stop_ts),
'min_ge',
db_manager,
val=ValueTuple(15,
'degree_F',
'group_temperature'))
self.assertEqual(min_ge_vt[0], 6)
min_le_vt = weewx.xtypes.DailySummaries.get_aggregate('outTemp',
TimeSpan(start_ts, stop_ts),
'min_le',
db_manager,
val=ValueTuple(0,
'degree_F',
'group_temperature'))
self.assertEqual(min_le_vt[0], 2)
def test_AggregateDaily(config_dict):
"""Test special aggregates that can be used against the daily summaries."""
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
minmax_vt = weewx.xtypes.DailySummaries.get_aggregate('outTemp',
TimeSpan(start_ts, stop_ts),
'minmax',
db_manager)
self.assertAlmostEqual(minmax_vt[0], 39.28, 2)
min_ge_vt = weewx.xtypes.DailySummaries.get_aggregate('outTemp',
TimeSpan(start_ts, stop_ts),
'min_ge',
db_manager,
val=ValueTuple(15,
'degree_F',
'group_temperature'))
assert min_ge_vt[0] == 6
max_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'max',
db_manager)
self.assertAlmostEqual(max_wind_vt[0], 24.0, 2)
min_le_vt = weewx.xtypes.DailySummaries.get_aggregate('outTemp',
TimeSpan(start_ts, stop_ts),
'min_le',
db_manager,
val=ValueTuple(0,
'degree_F',
'group_temperature'))
assert min_le_vt[0] == 2
avg_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'avg',
db_manager)
self.assertAlmostEqual(avg_wind_vt[0], 10.21, 2)
# Double check this last one against the average calculated from the archive
avg_wind_vt = weewx.xtypes.ArchiveTable.get_aggregate('windSpeed',
TimeSpan(start_ts, stop_ts),
'avg',
db_manager)
self.assertAlmostEqual(avg_wind_vt[0], 10.21, 2)
minmax_vt = weewx.xtypes.DailySummaries.get_aggregate('outTemp',
TimeSpan(start_ts, stop_ts),
'minmax',
db_manager)
assert minmax_vt[0] == pytest.approx(39.28, abs=0.01)
vecavg_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'vecavg',
db_manager)
self.assertAlmostEqual(vecavg_wind_vt[0], 5.14, 2)
max_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'max',
db_manager)
assert max_wind_vt[0] == pytest.approx(24.0, abs=0.01)
vecdir_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'vecdir',
db_manager)
self.assertAlmostEqual(vecdir_wind_vt[0], 88.77, 2)
avg_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'avg',
db_manager)
assert avg_wind_vt[0] == pytest.approx(10.21, abs=0.01)
# Double check this last one against the average calculated from the archive
avg_wind_vt = weewx.xtypes.ArchiveTable.get_aggregate('windSpeed',
TimeSpan(start_ts, stop_ts),
'avg',
db_manager)
assert avg_wind_vt[0] == pytest.approx(10.21, abs=0.01)
def test_get_aggregate_heatcool(self):
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
# First, with the default heating base:
heatdeg = weewx.xtypes.AggregateHeatCool.get_aggregate('heatdeg',
vecavg_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'sum',
'vecavg',
db_manager)
self.assertAlmostEqual(heatdeg[0], 1123.12, 2)
# Now with an explicit heating base:
heatdeg = weewx.xtypes.AggregateHeatCool.get_aggregate('heatdeg',
assert vecavg_wind_vt[0] == pytest.approx(5.14, abs=0.01)
vecdir_wind_vt = weewx.xtypes.DailySummaries.get_aggregate('wind',
TimeSpan(start_ts, stop_ts),
'sum',
db_manager,
skin_dict={
'Units': {'DegreeDays': {
'heating_base': (
60.0, "degree_F",
"group_temperature")
}}})
self.assertAlmostEqual(heatdeg[0], 968.12, 2)
'vecdir',
db_manager)
assert vecdir_wind_vt[0] == pytest.approx(88.77, abs=0.01)
def test_get_aggregate_windvec(self):
"""Test calculating special type 'windvec' using a variety of methods."""
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 3, 2, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
# Calculate the daily wind for 1-March-2010 using the daily summaries, the main archive
# table, and letting get_aggregate() choose.
for func in [
weewx.xtypes.WindVecDaily.get_aggregate,
weewx.xtypes.WindVec.get_aggregate,
weewx.xtypes.get_aggregate
]:
windvec = func('windvec', TimeSpan(start_ts, stop_ts), 'avg', db_manager)
self.assertAlmostEqual(windvec[0].real, -1.390, 3)
self.assertAlmostEqual(windvec[0].imag, 3.250, 3)
self.assertEqual(windvec[1:3], ('mile_per_hour', 'group_speed'))
def test_get_aggregate_heatcool(config_dict):
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
# Calculate the wind vector for the hour starting at 1-06-2010 15:00
hour_start_tt = (2010, 1, 6, 15, 0, 0, 0, 0, -1)
hour_stop_tt = (2010, 1, 6, 16, 0, 0, 0, 0, -1)
hour_start_ts = time.mktime(hour_start_tt)
hour_stop_ts = time.mktime(hour_stop_tt)
vt = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(hour_start_ts, hour_stop_ts),
'max', db_manager)
self.assertAlmostEqual(abs(vt[0]), 15.281, 3)
self.assertAlmostEqual(vt[0].real, 8.069, 3)
self.assertAlmostEqual(vt[0].imag, -12.976, 3)
vt = weewx.xtypes.WindVec.get_aggregate('windgustvec',
TimeSpan(hour_start_ts, hour_stop_ts),
'max', db_manager)
self.assertAlmostEqual(abs(vt[0]), 18.337, 3)
self.assertAlmostEqual(vt[0].real, 9.683, 3)
self.assertAlmostEqual(vt[0].imag, -15.572, 3)
# First, with the default heating base:
heatdeg = weewx.xtypes.AggregateHeatCool.get_aggregate('heatdeg',
TimeSpan(start_ts, stop_ts),
'sum',
db_manager)
assert heatdeg[0] == pytest.approx(1123.12, abs=0.01)
# Now with an explicit heating base:
heatdeg = weewx.xtypes.AggregateHeatCool.get_aggregate('heatdeg',
TimeSpan(start_ts, stop_ts),
'sum',
db_manager,
skin_dict={
'Units': {'DegreeDays': {
'heating_base': (
60.0, "degree_F",
"group_temperature")
}}})
assert heatdeg[0] == pytest.approx(968.12, abs=0.01)
vt = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(hour_start_ts, hour_stop_ts),
'not_null', db_manager)
self.assertTrue(vt[0])
self.assertEqual(vt[1], 'boolean')
self.assertEqual(vt[2], 'group_boolean')
def test_get_aggregate_expression(self):
"""Test using an expression in an aggregate"""
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 7, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 8, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
def test_get_aggregate_windvec(config_dict):
"""Test calculating special type 'windvec' using a variety of methods."""
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 3, 2, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
# This one is a valid expression:
value = weewx.xtypes.get_aggregate('rain-ET', TimeSpan(start_ts, stop_ts),
# Calculate the daily wind for 1-March-2010 using the daily summaries, the main archive
# table, and letting get_aggregate() choose.
for func in [
weewx.xtypes.WindVecDaily.get_aggregate,
weewx.xtypes.WindVec.get_aggregate,
weewx.xtypes.get_aggregate
]:
windvec = func('windvec', TimeSpan(start_ts, stop_ts), 'avg', db_manager)
assert windvec[0].real == pytest.approx(-1.390, abs=0.001)
assert windvec[0].imag == pytest.approx(3.250, abs=0.001)
assert windvec[1:3] == ('mile_per_hour', 'group_speed')
# Calculate the wind vector for the hour starting at 1-06-2010 15:00
hour_start_tt = (2010, 1, 6, 15, 0, 0, 0, 0, -1)
hour_stop_tt = (2010, 1, 6, 16, 0, 0, 0, 0, -1)
hour_start_ts = time.mktime(hour_start_tt)
hour_stop_ts = time.mktime(hour_stop_tt)
vt = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(hour_start_ts, hour_stop_ts),
'max', db_manager)
assert abs(vt[0]) == pytest.approx(15.281, abs=0.001)
assert vt[0].real == pytest.approx(8.069, abs=0.001)
assert vt[0].imag == pytest.approx(-12.976, abs=0.001)
vt = weewx.xtypes.WindVec.get_aggregate('windgustvec',
TimeSpan(hour_start_ts, hour_stop_ts),
'max', db_manager)
assert abs(vt[0]) == pytest.approx(18.337, abs=0.001)
assert vt[0].real == pytest.approx(9.683, abs=0.001)
assert vt[0].imag == pytest.approx(-15.572, abs=0.001)
vt = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(hour_start_ts, hour_stop_ts),
'not_null', db_manager)
assert vt[0]
assert vt[1] == 'boolean'
assert vt[2] == 'group_boolean'
def test_get_aggregate_expression(config_dict):
"""Test using an expression in an aggregate"""
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
month_start_tt = (2010, 7, 1, 0, 0, 0, 0, 0, -1)
month_stop_tt = (2010, 8, 1, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(month_start_tt)
stop_ts = time.mktime(month_stop_tt)
# This one is a valid expression:
value = weewx.xtypes.get_aggregate('rain-ET', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
assert value[0] == pytest.approx(2.94, abs=0.01)
# This one uses a nonsense variable:
with pytest.raises(weewx.UnknownAggregation):
value = weewx.xtypes.get_aggregate('rain-foo', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
self.assertAlmostEqual(value[0], 2.94, 2)
# This one uses a nonsense variable:
with self.assertRaises(weewx.UnknownAggregation):
value = weewx.xtypes.get_aggregate('rain-foo', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
# A valid function
value = weewx.xtypes.get_aggregate('max(rain-ET, 0)', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
assert value[0] == pytest.approx(9.57, abs=0.01)
# A valid function
value = weewx.xtypes.get_aggregate('max(rain-ET, 0)', TimeSpan(start_ts, stop_ts),
# This one uses a nonsense function
with pytest.raises(weedb.OperationalError):
value = weewx.xtypes.get_aggregate('foo(rain-ET)', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
self.assertAlmostEqual(value[0], 9.57, 2)
# This one uses a nonsense function
with self.assertRaises(weedb.OperationalError):
value = weewx.xtypes.get_aggregate('foo(rain-ET)', TimeSpan(start_ts, stop_ts),
'sum', db_manager)
def test_first_wind(self):
"""Test getting the first non-null wind record in a time range."""
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
# Get the first value for 2-Aug-2010. This date was chosen because the wind speed of
# the very first record of the day (at 00:30:00) is actually null, so the next value
# (at 01:00:00) should be the one chosen.
day_start_tt = (2010, 8, 2, 0, 0, 0, 0, 0, -1)
day_stop_tt = (2010, 8, 3, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(day_start_tt)
stop_ts = time.mktime(day_stop_tt)
# Check the premise of the test, aas well as get the expected results
results = [x for x in db_manager.genSql("SELECT windSpeed, windDir FROM archive "
"WHERE dateTime > ? "
"ORDER BY dateTime ASC LIMIT 2", (start_ts,))]
# We expect the first datum to be null
self.assertIsNone(results[0][0])
# This is the expected value: the 2nd datum
windSpeed, windDir = results[1]
expected = complex(windSpeed * math.cos(math.radians(90.0 - windDir)),
windSpeed * math.sin(math.radians(90.0 - windDir)))
value = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(start_ts, stop_ts),
'first', db_manager)
self.assertEqual(value[0], expected)
self.assertEqual(value[1], 'mile_per_hour')
self.assertEqual(value[2], 'group_speed')
def test_last_wind(self):
"""Test getting the last non-null wind record in a time range."""
with weewx.manager.open_manager_with_config(self.config_dict, 'wx_binding') as db_manager:
# Get the last value for 18-Apr-2010. This date was chosen because the wind speed of
# the very last record of the day (at 19-Apr-2010 00:00:00) is actually null, so the
# previous value (at 18-Apr-2010 23:30:00) should be the one chosen.
day_start_tt = (2010, 4, 18, 0, 0, 0, 0, 0, -1)
day_stop_tt = (2010, 4, 19, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(day_start_tt)
stop_ts = time.mktime(day_stop_tt)
# Check the premise of the test, as well as get the expected results
results = [x for x in db_manager.genSql("SELECT windSpeed, windDir FROM archive "
"WHERE dateTime <= ? "
"ORDER BY dateTime DESC LIMIT 2", (stop_ts,))]
# We expect the first record (which is the last record of the day) to be null
self.assertIsNone(results[0][0])
# This is the expected value: the 2nd record
windSpeed, windDir = results[1]
expected = complex(windSpeed * math.cos(math.radians(90.0 - windDir)),
windSpeed * math.sin(math.radians(90.0 - windDir)))
value = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(start_ts, stop_ts),
'last', db_manager)
self.assertAlmostEqual(value[0], expected)
self.assertEqual(value[1], 'mile_per_hour')
self.assertEqual(value[2], 'group_speed')
if __name__ == '__main__':
unittest.main()
def test_first_wind(config_dict):
"""Test getting the first non-null wind record in a time range."""
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
# Get the first value for 2-Aug-2010. This date was chosen because the wind speed of
# the very first record of the day (at 00:30:00) is actually null, so the next value
# (at 01:00:00) should be the one chosen.
day_start_tt = (2010, 8, 2, 0, 0, 0, 0, 0, -1)
day_stop_tt = (2010, 8, 3, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(day_start_tt)
stop_ts = time.mktime(day_stop_tt)
# Check the premise of the test, aas well as get the expected results
results = [x for x in db_manager.genSql("SELECT windSpeed, windDir FROM archive "
"WHERE dateTime > ? "
"ORDER BY dateTime ASC LIMIT 2", (start_ts,))]
# We expect the first datum to be null
assert results[0][0] is None
# This is the expected value: the 2nd datum
windSpeed, windDir = results[1]
expected = complex(windSpeed * math.cos(math.radians(90.0 - windDir)),
windSpeed * math.sin(math.radians(90.0 - windDir)))
value = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(start_ts, stop_ts),
'first', db_manager)
assert value[0] == expected
assert value[1] == 'mile_per_hour'
assert value[2] == 'group_speed'
def test_last_wind(config_dict):
"""Test getting the last non-null wind record in a time range."""
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
# Get the last value for 18-Apr-2010. This date was chosen because the wind speed of
# the very last record of the day (at 19-Apr-2010 00:00:00) is actually null, so the
# previous value (at 18-Apr-2010 23:30:00) should be the one chosen.
day_start_tt = (2010, 4, 18, 0, 0, 0, 0, 0, -1)
day_stop_tt = (2010, 4, 19, 0, 0, 0, 0, 0, -1)
start_ts = time.mktime(day_start_tt)
stop_ts = time.mktime(day_stop_tt)
# Check the premise of the test, as well as get the expected results
results = [x for x in db_manager.genSql("SELECT windSpeed, windDir FROM archive "
"WHERE dateTime <= ? "
"ORDER BY dateTime DESC LIMIT 2", (stop_ts,))]
# We expect the first record (which is the last record of the day) to be null
assert results[0][0] is None
# This is the expected value: the 2nd record
windSpeed, windDir = results[1]
expected = complex(windSpeed * math.cos(math.radians(90.0 - windDir)),
windSpeed * math.sin(math.radians(90.0 - windDir)))
value = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(start_ts, stop_ts),
'last', db_manager)
assert value[0] == pytest.approx(expected)
assert value[1] == 'mile_per_hour'
assert value[2] == 'group_speed'

View File

@@ -95,158 +95,169 @@ def test_get_series_archive_outTemp(config_dict):
assert len(data_vec[0]) == (stop_ts - start_ts) / interval
# def test_get_series_daily_agg_rain_sum(db_manager):
# """Test a series of daily aggregated rain totals, run against the daily summaries"""
# # Calculate the total daily rain
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.DailySummaries.get_series('rain',
# TimeSpan(month_start_ts, month_stop_ts),
# db_manager,
# 'sum',
# 'day')
# # March has 30 days.
# assert len(start_vec[0]) == 30 + 1
# assert len(stop_vec[0]) == 30 + 1
# assert (["%.2f" % d for d in data_vec[0]], data_vec[1], data_vec[2]) \
# == (["%.2f" % d for d in expected_daily_rain_sum], 'inch', 'group_rain')
#
# def test_get_series_archive_agg_rain_sum(db_manager):
# """Test a series of daily aggregated rain totals, run against the main archive table"""
# # Calculate the total daily rain
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.ArchiveTable.get_series('rain',
# TimeSpan(month_start_ts, month_stop_ts),
# db_manager,
# 'sum',
# 'day')
# # March has 30 days.
# assert len(start_vec[0]) == 30 + 1
# assert len(stop_vec[0]) == 30 + 1
# assert (["%.2f" % d for d in data_vec[0]], data_vec[1], data_vec[2]) \
# == (["%.2f" % d for d in expected_daily_rain_sum], 'inch', 'group_rain')
#
# def test_get_series_archive_agg_rain_cum(db_manager):
# """Test a series of daily cumulative rain totals, run against the main archive table."""
# # Calculate the cumulative total daily rain
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.ArchiveTable.get_series('rain',
# TimeSpan(month_start_ts, month_stop_ts),
# db_manager,
# 'cumulative',
# 24 * 3600)
# # March has 30 days.
# assert len(start_vec[0]) == 30 + 1
# assert len(stop_vec[0]) == 30 + 1
# right_answer = functools.reduce(lambda v, x: v + [v[-1] + x], expected_daily_rain_sum, [0])[1:]
# assert (["%.2f" % d for d in data_vec[0]], data_vec[1], data_vec[2]) \
# == (["%.2f" % d for d in right_answer], 'inch', 'group_rain')
#
# def test_get_series_archive_windvec(db_manager):
# """Test a series of 'windvec', with no aggregation, run against the main archive table"""
# # Get a series of wind values
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.WindVec.get_series('windvec',
# TimeSpan(start_ts, stop_ts),
# db_manager)
# assert len(start_vec[0]) == (stop_ts - start_ts) / interval + 1
# assert len(stop_vec[0]) == (stop_ts - start_ts) / interval + 1
# assert len(data_vec[0]) == (stop_ts - start_ts) / interval + 1
#
# def test_get_series_archive_agg_windvec_avg(db_manager):
# """Test a series of 'windvec', with 'avg' aggregation. This will exercise
# WindVec.get_series(0), which, in turn, will call WindVecDaily.get_aggregate() to get each
# individual aggregate value."""
# # Get a series of wind values
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.WindVec.get_series('windvec',
# TimeSpan(month_start_ts, month_stop_ts),
# db_manager,
# 'avg',
# 24 * 3600)
# # March has 30 days.
# assert len(start_vec[0]) == 30 + 1
# assert len(stop_vec[0]) == 30 + 1
# assert (["(%.2f, %.2f)" % (x.real, x.imag) for x in data_vec[0]]) \
# == (["(%.2f, %.2f)" % (x[0], x[1]) for x in expected_daily_wind_avg])
#
# def test_get_series_archive_agg_windvec_last(db_manager):
# """Test a series of 'windvec', with 'last' aggregation. This will exercise
# WindVec.get_series(), which, in turn, will call WindVec.get_aggregate() to get each
# individual aggregate value."""
# # Get a series of wind values
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.get_series('windvec',
# TimeSpan(month_start_ts, month_stop_ts),
# db_manager,
# 'last',
# 24 * 3600)
# # March has 30 days.
# assert len(start_vec[0]) == 30 + 1
# assert len(stop_vec[0]) == 30 + 1
# # The round(x, 2) + 0 is necessary to avoid 0.00 comparing different from -0.00.
# assert ["(%.2f, %.2f)" % (round(x.real, 2) + 0, round(x.imag, 2) + 0) for x in data_vec[0]] \
# == ["(%.2f, %.2f)" % (x[0], x[1]) for x in expected_daily_wind_last]
#
# def test_get_aggregate_windvec_last(db_manager):
# """Test getting a windvec aggregation over a period that does not fall on midnight
# boundaries."""
#
# # This time span was chosen because it includes a null value.
# start_tt = (2010, 3, 2, 12, 0, 0, 0, 0, -1)
# start = time.mktime(start_tt) # = 1267560000
# stop = start + 6 * 3600
# # Double check that the null value is in there
# assert db_manager.getRecord(1267570800)['windSpeed'] is None
#
# # Get a simple 'avg' aggregation over this period
# val_t = weewx.xtypes.WindVec.get_aggregate('windvec',
# TimeSpan(start, stop),
# 'avg',
# db_manager)
# assert type(val_t[0]) is complex
# assert val_t[0].real == pytest.approx(15.37441)
# assert val_t[0].imag == pytest.approx(9.79138)
# assert val_t[1] == 'mile_per_hour'
# assert val_t[2] == 'group_speed'
#
# def test_get_series_on_the_fly(db_manager):
# """Test a series of a user-defined type with no aggregation,
# run against the archive table."""
#
# # This time span was chosen because it includes a null for outTemp at 0330
# start_tt = (2010, 3, 2, 2, 0, 0, 0, 0, -1)
# stop_tt = (2010, 3, 2, 5, 0, 0, 0, 0, -1)
# start = time.mktime(start_tt) # == 1267524000
# stop = time.mktime(stop_tt) # == 1267534800
# # Make sure the null outTemp is in there
# assert db_manager.getRecord(1267529400)['outTemp'] is None
#
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.get_series('vapor_p',
# TimeSpan(start, stop),
# db_manager)
#
# for actual, expected in zip(data_vec[0], expected_vapor_pressures):
# assert actual == pytest.approx(expected)
# assert data_vec[1] == 'inHg'
# assert data_vec[2] == 'group_pressure'
#
# def test_get_aggregate_series_on_the_fly(db_manager):
# """Test a series of a user-defined type with aggregation, run against the archive table."""
#
# start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
# stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
# start = time.mktime(start_tt)
# stop = time.mktime(stop_tt)
#
# start_vec, stop_vec, data_vec \
# = weewx.xtypes.get_series('vapor_p',
# TimeSpan(start, stop),
# db_manager,
# aggregate_type='avg',
# aggregate_interval=6 * 3600)
#
# for actual, expected in zip(data_vec[0], expected_aggregate_vapor_pressures):
# assert actual == pytest.approx(expected, abs=1e-6)
# assert data_vec[1] == 'inHg'
# assert data_vec[2] == 'group_pressure'
def test_get_series_daily_agg_rain_sum(config_dict):
"""Test a series of daily aggregated rain totals, run against the daily summaries"""
# Calculate the total daily rain
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
start_vec, stop_vec, data_vec \
= weewx.xtypes.DailySummaries.get_series('rain',
TimeSpan(month_start_ts, month_stop_ts),
db_manager,
'sum',
'day')
# March has 30 days.
assert len(start_vec[0]) == 30 + 1
assert len(stop_vec[0]) == 30 + 1
assert (["%.2f" % d for d in data_vec[0]], data_vec[1], data_vec[2]) \
== (["%.2f" % d for d in expected_daily_rain_sum], 'inch', 'group_rain')
def test_get_series_archive_agg_rain_sum(config_dict):
"""Test a series of daily aggregated rain totals, run against the main archive table"""
# Calculate the total daily rain
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
start_vec, stop_vec, data_vec \
= weewx.xtypes.ArchiveTable.get_series('rain',
TimeSpan(month_start_ts, month_stop_ts),
db_manager,
'sum',
'day')
# March has 30 days.
assert len(start_vec[0]) == 30 + 1
assert len(stop_vec[0]) == 30 + 1
assert (["%.2f" % d for d in data_vec[0]], data_vec[1], data_vec[2]) \
== (["%.2f" % d for d in expected_daily_rain_sum], 'inch', 'group_rain')
def test_get_series_archive_agg_rain_cum(config_dict):
"""Test a series of daily cumulative rain totals, run against the main archive table."""
# Calculate the cumulative total daily rain
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
start_vec, stop_vec, data_vec \
= weewx.xtypes.ArchiveTable.get_series('rain',
TimeSpan(month_start_ts, month_stop_ts),
db_manager,
'cumulative',
24 * 3600)
# March has 30 days.
assert len(start_vec[0]) == 30 + 1
assert len(stop_vec[0]) == 30 + 1
right_answer = functools.reduce(lambda v, x: v + [v[-1] + x], expected_daily_rain_sum, [0])[1:]
assert (["%.2f" % d for d in data_vec[0]], data_vec[1], data_vec[2]) \
== (["%.2f" % d for d in right_answer], 'inch', 'group_rain')
def test_get_series_archive_windvec(config_dict):
"""Test a series of 'windvec', with no aggregation, run against the main archive table"""
# Get a series of wind values
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
start_vec, stop_vec, data_vec \
= weewx.xtypes.WindVec.get_series('windvec',
TimeSpan(start_ts, stop_ts),
db_manager)
assert len(start_vec[0]) == (stop_ts - start_ts) / interval + 1
assert len(stop_vec[0]) == (stop_ts - start_ts) / interval + 1
assert len(data_vec[0]) == (stop_ts - start_ts) / interval + 1
def test_get_series_archive_agg_windvec_avg(config_dict):
"""Test a series of 'windvec', with 'avg' aggregation. This will exercise
WindVec.get_series(0), which, in turn, will call WindVecDaily.get_aggregate() to get each
individual aggregate value."""
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
# Get a series of wind values
start_vec, stop_vec, data_vec \
= weewx.xtypes.WindVec.get_series('windvec',
TimeSpan(month_start_ts, month_stop_ts),
db_manager,
'avg',
24 * 3600)
# March has 30 days.
assert len(start_vec[0]) == 30 + 1
assert len(stop_vec[0]) == 30 + 1
assert (["(%.2f, %.2f)" % (x.real, x.imag) for x in data_vec[0]]) \
== (["(%.2f, %.2f)" % (x[0], x[1]) for x in expected_daily_wind_avg])
def test_get_series_archive_agg_windvec_last(config_dict):
"""Test a series of 'windvec', with 'last' aggregation. This will exercise
WindVec.get_series(), which, in turn, will call WindVec.get_aggregate() to get each
individual aggregate value."""
# Get a series of wind values
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
start_vec, stop_vec, data_vec \
= weewx.xtypes.get_series('windvec',
TimeSpan(month_start_ts, month_stop_ts),
db_manager,
'last',
24 * 3600)
# March has 30 days.
assert len(start_vec[0]) == 30 + 1
assert len(stop_vec[0]) == 30 + 1
# The round(x, 2) + 0 is necessary to avoid 0.00 comparing different from -0.00.
assert ["(%.2f, %.2f)" % (round(x.real, 2) + 0, round(x.imag, 2) + 0) for x in data_vec[0]] \
== ["(%.2f, %.2f)" % (x[0], x[1]) for x in expected_daily_wind_last]
def test_get_aggregate_windvec_last(config_dict):
"""Test getting a windvec aggregation over a period that does not fall on midnight
boundaries."""
# This time span was chosen because it includes a null value.
start_tt = (2010, 3, 2, 12, 0, 0, 0, 0, -1)
start = time.mktime(start_tt) # = 1267560000
stop = start + 6 * 3600
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
# Double check that the null value is in there
assert db_manager.getRecord(1267570800)['windSpeed'] is None
# Get a simple 'avg' aggregation over this period
val_t = weewx.xtypes.WindVec.get_aggregate('windvec',
TimeSpan(start, stop),
'avg',
db_manager)
assert type(val_t[0]) is complex
assert val_t[0].real == pytest.approx(15.37441)
assert val_t[0].imag == pytest.approx(9.79138)
assert val_t[1] == 'mile_per_hour'
assert val_t[2] == 'group_speed'
def test_get_series_on_the_fly(config_dict):
"""Test a series of a user-defined type with no aggregation,
run against the archive table."""
# This time span was chosen because it includes a null for outTemp at 0330
start_tt = (2010, 3, 2, 2, 0, 0, 0, 0, -1)
stop_tt = (2010, 3, 2, 5, 0, 0, 0, 0, -1)
start = time.mktime(start_tt) # == 1267524000
stop = time.mktime(stop_tt) # == 1267534800
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
# Make sure the null outTemp is in there
assert db_manager.getRecord(1267529400)['outTemp'] is None
start_vec, stop_vec, data_vec \
= weewx.xtypes.get_series('vapor_p',
TimeSpan(start, stop),
db_manager)
for actual, expected in zip(data_vec[0], expected_vapor_pressures):
assert actual == pytest.approx(expected)
assert data_vec[1] == 'inHg'
assert data_vec[2] == 'group_pressure'
def test_get_aggregate_series_on_the_fly(config_dict):
"""Test a series of a user-defined type with aggregation, run against the archive table."""
start_tt = (2010, 3, 1, 0, 0, 0, 0, 0, -1)
stop_tt = (2010, 4, 1, 0, 0, 0, 0, 0, -1)
start = time.mktime(start_tt)
stop = time.mktime(stop_tt)
with weewx.manager.open_manager_with_config(config_dict, 'wx_binding') as db_manager:
start_vec, stop_vec, data_vec \
= weewx.xtypes.get_series('vapor_p',
TimeSpan(start, stop),
db_manager,
aggregate_type='avg',
aggregate_interval=6 * 3600)
for actual, expected in zip(data_vec[0], expected_aggregate_vapor_pressures):
assert actual == pytest.approx(expected, abs=1e-6)
assert data_vec[1] == 'inHg'
assert data_vec[2] == 'group_pressure'