Got database and daily summary tests working.

Fixed bug in minimum wind speed in accumulator.
This commit is contained in:
Tom Keffer
2014-10-28 21:28:41 +00:00
parent 4d71118607
commit 9b9deb0397
6 changed files with 312 additions and 311 deletions

View File

@@ -20,6 +20,7 @@ using accumulateLeaves, so you can use overrides?
Move schemas to the extension that uses them.
Wind "low" values not getting added properly

View File

@@ -337,7 +337,8 @@ class Accum(dict):
self.init_type('wind')
# Then add to highs/lows, and to the running sum:
if add_hilo:
self['wind'].addHiLo((record.get('windGust'), record.get('windGustDir')), record['dateTime'])
self['wind'].addHiLo((record.get('windGust'), record.get('windGustDir')), record['dateTime'])
self['wind'].addHiLo((record.get('windSpeed'), record.get('windDir')), record['dateTime'])
self['wind'].addSum((record['windSpeed'], record.get('windDir')))
def check_units(self, record, obs_type, add_hilo):

View File

@@ -19,9 +19,8 @@ import syslog
import time
import schemas.wview
import weewx
import weedb
import weeutil.weeutil
import weewx.database
# One year of data:
start_tt = (2010,1,1,0,0,0,0,0,-1)
@@ -45,31 +44,35 @@ interval = 600
schema = schemas.wview.schema
def configDatabases(database_cls, database_dict):
def configDatabases(config_dict, binding):
"""Configures the archive databases."""
global schema
# Check to see if it already exists and is configured correctly.
try:
with database_cls(database_dict) as archive:
with weewx.database.open_database(config_dict, binding) as archive:
if archive.firstGoodStamp() == start_ts and archive.lastGoodStamp() == stop_ts:
# Database already exists. We're done.
return
except:
except weedb.DatabaseError:
pass
# Delete anything that might already be there.
try:
weedb.drop(database_dict)
except:
weewx.database.drop_database(config_dict, binding)
except weedb.DatabaseError:
pass
# Need to build a new synthetic database. General strategy is to create the
# archive data, THEN backfill with the daily summaries. This is faster than
# creating the daily summaries on the fly.
# creating the daily summaries on the fly.
# First, we need to modify the configuration dictionary that was passed in
# so it uses the DBManager, instead of the daily summary manager
monkey_dict = config_dict.dict()
monkey_dict['Bindings'][binding]['manager'] = 'weewx.database.DBManager'
with weewx.database.DBManager(database_dict, schema) as archive:
with weewx.database.open_database(monkey_dict, binding, initialize=True) as archive:
# Because this can generate voluminous log information,
# suppress all but the essentials:
@@ -81,7 +84,7 @@ def configDatabases(database_cls, database_dict):
t2 = time.time()
print "Time to create synthetic archive database = %6.2fs" % (t2-t1,)
with database_cls(database_dict, schema) as archive:
with weewx.database.open_database(config_dict, binding, initialize=True) as archive:
# Now go back to regular logging:
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))

View File

@@ -0,0 +1,284 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2011, 2012 Tom Keffer <tkeffer@gmail.com>
#
# See the file LICENSE.txt for your full rights.
#
# $Revision$
# $Author$
# $Date$
#
"""Unit test module weewx.wxstats"""
from __future__ import with_statement
import os
import sys
import syslog
import time
import unittest
import math
import configobj
import weeutil.weeutil
import weewx.tags
import gen_fake_data
day_keys = [x[0] for x in gen_fake_data.schema if x[0] not in ['dateTime', 'interval', 'usUnits']] + ['wind']
config_path = "testgen.conf"
cwd = None
skin_dict = {'Units' : {'Trend': {'time_delta': 3600, 'time_grace': 300},
'DegreeDay' : {'heating_base' : "65, degree_F",
'cooling_base' : "65, degree_C"} } }
class Common(unittest.TestCase):
def setUp(self):
global config_path
global cwd
weewx.debug = 1
syslog.openlog('test_stats', syslog.LOG_CONS)
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))
# Save and set the current working directory in case some service changes it.
if not cwd:
cwd = os.getcwd()
else:
os.chdir(cwd)
try :
self.config_dict = configobj.ConfigObj(config_path, file_error=True)
except IOError:
sys.stderr.write("Unable to open configuration file %s" % self.config_path)
# Reraise the exception (this will eventually cause the program to exit)
raise
except configobj.ConfigObjError:
sys.stderr.write("Error while parsing configuration file %s" % config_path)
raise
# This will generate the test databases if necessary:
gen_fake_data.configDatabases(self.config_dict, self.binding)
def tearDown(self):
pass
def test_create_stats(self):
global day_keys
with weewx.database.open_database(self.config_dict, self.binding) as manager:
self.assertItemsEqual(sorted(manager.daykeys), sorted(day_keys))
self.assertEqual(manager.connection.columnsOf('archive_day_barometer'),
['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count', 'wsum', 'sumtime'])
self.assertEqual(manager.connection.columnsOf('archive_day_wind'),
['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count', 'wsum', 'sumtime',
'max_dir', 'xsum', 'ysum', 'dirsumtime', 'squaresum', 'wsquaresum'])
def testScalarTally(self):
with weewx.database.open_database(self.config_dict, self.binding) as manager:
# Pick a random day, say 15 March:
start_ts = int(time.mktime((2010,3,15,0,0,0,0,0,-1)))
stop_ts = int(time.mktime((2010,3,16,0,0,0,0,0,-1)))
# Sanity check that this is truly the start of day:
self.assertEqual(start_ts, weeutil.weeutil.startOfDay(start_ts))
# Get a day's stats from the daily summaries:
allStats = manager._get_day_summary(start_ts)
# Now calculate the same summaries from the raw data in the archive.
# Here are some random observation types:
for stats_type in ['barometer', 'outTemp', 'rain']:
# Now test all the aggregates:
for aggregate in ['min', 'max', 'sum', 'count', 'avg']:
# Compare to the main archive:
res = manager.getSql("SELECT %s(%s) FROM archive WHERE dateTime>? AND dateTime <=?;" % (aggregate, stats_type), (start_ts, stop_ts))
# The results from the daily summaries for this aggregation
allStats_res = getattr(allStats[stats_type], aggregate)
self.assertAlmostEqual(allStats_res, res[0], msg="Value check. Failing type %s, aggregate: %s" % (stats_type, aggregate))
# Check the times of min and max as well:
if aggregate in ['min','max']:
res2 = manager.getSql("SELECT dateTime FROM archive WHERE %s = ? AND dateTime>? AND dateTime <=?" % (stats_type,), (res[0], start_ts, stop_ts))
stats_time = getattr(allStats[stats_type], aggregate+'time')
self.assertEqual(stats_time, res2[0], "Time check. Failing type %s, aggregate: %s" % (stats_type, aggregate))
def testWindTally(self):
with weewx.database.open_database(self.config_dict, self.binding) as manager:
# Pick a random day, say 15 March:
start_ts = int(time.mktime((2010,3,15,0,0,0,0,0,-1)))
stop_ts = int(time.mktime((2010,3,16,0,0,0,0,0,-1)))
# Sanity check that this is truly the start of day:
self.assertEqual(start_ts, weeutil.weeutil.startOfDay(start_ts))
allStats = manager._get_day_summary(start_ts)
# Test all the aggregates:
for aggregate in ['min', 'max', 'sum', 'count', 'avg']:
if aggregate == 'max':
res = manager.getSql("SELECT MAX(windGust) FROM archive WHERE dateTime>? AND dateTime <=?;", (start_ts, stop_ts))
else:
res = manager.getSql("SELECT %s(windSpeed) FROM archive WHERE dateTime>? AND dateTime <=?;" % (aggregate, ), (start_ts, stop_ts))
# From StatsDb:
allStats_res = getattr(allStats['wind'], aggregate)
self.assertAlmostEqual(allStats_res, res[0])
# Check the times of min and max as well:
if aggregate == 'min':
resmin = manager.getSql("SELECT dateTime FROM archive WHERE windSpeed = ? AND dateTime>? AND dateTime <=?", (res[0], start_ts, stop_ts))
self.assertEqual(allStats['wind'].mintime, resmin[0])
elif aggregate == 'max':
resmax = manager.getSql("SELECT dateTime FROM archive WHERE windGust = ? AND dateTime>? AND dateTime <=?", (res[0], start_ts, stop_ts))
self.assertEqual(allStats['wind'].maxtime, resmax[0])
# Check RMS:
(squaresum, count) = manager.getSql("SELECT SUM(windSpeed*windSpeed), COUNT(windSpeed) from archive where dateTime>? AND dateTime<=?;", (start_ts, stop_ts))
rms = math.sqrt(squaresum/count) if count else None
self.assertAlmostEqual(allStats['wind'].rms, rms)
def testTags(self):
global skin_dict
with weewx.database.open_database(self.config_dict, self.binding) as manager:
spans = {'day' : weeutil.weeutil.TimeSpan(time.mktime((2010,3,15,0,0,0,0,0,-1)),
time.mktime((2010,3,16,0,0,0,0,0,-1))),
'week' : weeutil.weeutil.TimeSpan(time.mktime((2010,3,14,0,0,0,0,0,-1)),
time.mktime((2010,3,21,0,0,0,0,0,-1))),
'month': weeutil.weeutil.TimeSpan(time.mktime((2010,3,01,0,0,0,0,0,-1)),
time.mktime((2010,4,01,0,0,0,0,0,-1))),
'year' : weeutil.weeutil.TimeSpan(time.mktime((2010,1,01,0,0,0,0,0,-1)),
time.mktime((2011,1,01,0,0,0,0,0,-1)))}
# This may not necessarily execute in the order specified above:
for span in spans:
start_ts = spans[span].start
stop_ts = spans[span].stop
tagStats = weewx.tags.DatabaseBinder(manager, stop_ts,
rain_year_start=1,
skin_dict=skin_dict)
# Cycle over the statistical types:
for stats_type in ('barometer', 'outTemp', 'rain'):
# Now test all the aggregates:
for aggregate in ('min', 'max', 'sum', 'count', 'avg'):
# Compare to the main archive:
res = manager.getSql("SELECT %s(%s) FROM archive WHERE dateTime>? AND dateTime <=?;" % (aggregate, stats_type), (start_ts, stop_ts))
archive_result = res[0]
# This is how you form a tag such as tagStats.month.barometer.avg when
# all you have is strings holding the attributes:
value_helper = getattr(getattr(getattr(tagStats, span), stats_type), aggregate)
self.assertAlmostEqual(float(str(value_helper.formatted)), archive_result, places=1)
# Check the times of min and max as well:
if aggregate in ('min','max'):
res2 = manager.getSql("SELECT dateTime FROM archive WHERE %s = ? AND dateTime>? AND dateTime <=?" % (stats_type,), (archive_result, start_ts, stop_ts))
stats_value_helper = getattr(getattr(getattr(tagStats, span), stats_type), aggregate +'time')
self.assertEqual(stats_value_helper.raw, res2[0])
self.assertEqual(str(tagStats.day.barometer.avg), "30.675 inHg")
self.assertEqual(str(tagStats.day.barometer.min), "30.065 inHg")
self.assertEqual(str(tagStats.day.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.day.barometer.mintime), "00:00")
self.assertEqual(str(tagStats.day.barometer.maxtime), "01:00")
self.assertEqual(str(tagStats.week.barometer.avg), "29.904 inHg")
self.assertEqual(str(tagStats.week.barometer.min), "29.000 inHg")
self.assertEqual(str(tagStats.week.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.week.barometer.mintime), "01:00 on Monday")
self.assertEqual(str(tagStats.week.barometer.maxtime), "01:00 on Wednesday")
self.assertEqual(str(tagStats.month.barometer.avg), "30.021 inHg")
self.assertEqual(str(tagStats.month.barometer.min), "29.000 inHg")
self.assertEqual(str(tagStats.month.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.month.barometer.mintime), "05-Mar-2010 00:00")
self.assertEqual(str(tagStats.month.barometer.maxtime), "03-Mar-2010 00:00")
self.assertEqual(str(tagStats.year.barometer.avg), "30.002 inHg")
self.assertEqual(str(tagStats.year.barometer.min), "29.000 inHg")
self.assertEqual(str(tagStats.year.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.year.barometer.mintime), "04-Jan-2010 00:00")
self.assertEqual(str(tagStats.year.barometer.maxtime), "02-Jan-2010 00:00")
self.assertEqual(str(tagStats.day.outTemp.avg), "38.8°F")
self.assertEqual(str(tagStats.day.outTemp.min), "18.6°F")
self.assertEqual(str(tagStats.day.outTemp.max), "59.0°F")
self.assertEqual(str(tagStats.day.outTemp.mintime), "07:00")
self.assertEqual(str(tagStats.day.outTemp.maxtime), "19:00")
self.assertEqual(str(tagStats.week.outTemp.avg), "38.8°F")
self.assertEqual(str(tagStats.week.outTemp.min), "16.6°F")
self.assertEqual(str(tagStats.week.outTemp.max), "61.0°F")
self.assertEqual(str(tagStats.week.outTemp.mintime), "07:00 on Sunday")
self.assertEqual(str(tagStats.week.outTemp.maxtime), "19:00 on Saturday")
self.assertEqual(str(tagStats.month.outTemp.avg), "28.7°F")
self.assertEqual(str(tagStats.month.outTemp.min), "-0.9°F")
self.assertEqual(str(tagStats.month.outTemp.max), "59.0°F")
self.assertEqual(str(tagStats.month.outTemp.mintime), "01-Mar-2010 06:00")
self.assertEqual(str(tagStats.month.outTemp.maxtime), "31-Mar-2010 19:00")
self.assertEqual(str(tagStats.year.outTemp.avg), "40.0°F")
self.assertEqual(str(tagStats.year.outTemp.min), "-20.0°F")
self.assertEqual(str(tagStats.year.outTemp.max), "100.0°F")
self.assertEqual(str(tagStats.year.outTemp.mintime), "01-Jan-2010 06:00")
self.assertEqual(str(tagStats.year.outTemp.maxtime), "02-Jul-2010 19:00")
# Check the special aggregate types "exists" and "has_data":
self.assertTrue(tagStats.year.barometer.exists)
self.assertTrue(tagStats.year.barometer.has_data)
self.assertFalse(tagStats.year.bar.exists)
self.assertFalse(tagStats.year.bar.has_data)
self.assertTrue(tagStats.year.inHumidity.exists)
self.assertFalse(tagStats.year.inHumidity.has_data)
def test_rainYear(self):
with weewx.database.open_database(self.config_dict, self.binding) as manager:
stop_ts = time.mktime((2011,1,01,0,0,0,0,0,-1))
# Check for a rain year starting 1-Jan
tagStats = weewx.tags.DatabaseBinder(manager, stop_ts,
rain_year_start=1)
self.assertEqual(str(tagStats.rainyear.rain.sum), "86.59 in")
# Do it again, for starting 1-Oct:
tagStats = weewx.tags.DatabaseBinder(manager, stop_ts,
rain_year_start=10)
self.assertEqual(str(tagStats.rainyear.rain.sum), "21.89 in")
def test_heatcool(self):
with weewx.database.open_database(self.config_dict, self.binding) as manager:
#Test heating and cooling degree days:
stop_ts = time.mktime((2011,1,01,0,0,0,0,0,-1))
tagStats = weewx.tags.DatabaseBinder(manager, stop_ts,
skin_dict=skin_dict)
self.assertEqual(str(tagStats.year.heatdeg.sum), "10150.7°F-day")
self.assertEqual(str(tagStats.year.cooldeg.sum), "1026.2°F-day")
class TestSqlite(Common):
def __init__(self, *args, **kwargs):
self.binding = "wx_sqlite"
super(TestSqlite, self).__init__(*args, **kwargs)
class TestMySQL(Common):
def __init__(self, *args, **kwargs):
self.binding = "wx_mysql"
super(TestMySQL, self).__init__(*args, **kwargs)
def suite():
tests = ['test_create_stats', 'testScalarTally', 'testWindTally',
'testTags', 'test_rainYear', 'test_heatcool']
# Test both sqlite and MySQL:
return unittest.TestSuite(map(TestSqlite, tests) + map(TestMySQL, tests))
if __name__ == '__main__':
unittest.TextTestRunner(verbosity=2).run(suite())

View File

@@ -13,7 +13,7 @@ from __future__ import with_statement
import unittest
import time
import weewx.archive
import weewx.database
import weedb
import weeutil.weeutil
@@ -80,20 +80,20 @@ class Common(unittest.TestCase):
def test_no_archive(self):
# Attempt to open a non-existent database results in an exception:
self.assertRaises(weedb.OperationalError, weewx.archive.Archive.open, self.archive_db_dict)
self.assertRaises(weedb.OperationalError, weewx.database.DBManager.open, self.archive_db_dict)
def test_unitialized_archive(self):
_connect = weedb.create(self.archive_db_dict)
self.assertRaises(weewx.UninitializedDatabase, weewx.archive.Archive(_connect))
self.assertRaises(weewx.UninitializedDatabase, weewx.database.DBManager(_connect))
def test_create_archive(self):
archive = weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema)
archive = weewx.database.DBManager.open_with_create(self.archive_db_dict, archive_schema)
self.assertItemsEqual(archive.connection.tables(), ['archive'])
self.assertEqual(archive.connection.columnsOf('archive'), ['dateTime', 'usUnits', 'interval', 'barometer', 'inTemp', 'outTemp', 'windSpeed'])
archive.close()
# Now that the database exists, these should also succeed:
archive = weewx.archive.Archive.open(self.archive_db_dict)
archive = weewx.database.DBManager.open(self.archive_db_dict)
self.assertItemsEqual(archive.connection.tables(), ['archive'])
self.assertEqual(archive.connection.columnsOf('archive'), ['dateTime', 'usUnits', 'interval', 'barometer', 'inTemp', 'outTemp', 'windSpeed'])
self.assertEqual(archive.sqlkeys, ['dateTime', 'usUnits', 'interval', 'barometer', 'inTemp', 'outTemp', 'windSpeed'])
@@ -101,7 +101,7 @@ class Common(unittest.TestCase):
archive.close()
def test_empty_archive(self):
archive = weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema)
archive = weewx.database.DBManager.open_with_create(self.archive_db_dict, archive_schema)
self.assertEqual(archive.firstGoodStamp(), None)
self.assertEqual(archive.lastGoodStamp(), None)
self.assertEqual(archive.getRecord(123456789), None)
@@ -109,11 +109,11 @@ class Common(unittest.TestCase):
def test_add_archive_records(self):
# Test adding records using a 'with' statement:
with weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema) as archive:
with weewx.database.DBManager.open_with_create(self.archive_db_dict, archive_schema) as archive:
archive.addRecord(genRecords())
# Now test to see what's in there:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
with weewx.database.DBManager.open(self.archive_db_dict) as archive:
self.assertEqual(archive.firstGoodStamp(), start_ts)
self.assertEqual(archive.lastGoodStamp(), stop_ts)
self.assertEqual(archive.std_unit_system, std_unit_system)
@@ -138,11 +138,11 @@ class Common(unittest.TestCase):
def test_get_records(self):
# Add a bunch of records:
with weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema) as archive:
with weewx.database.DBManager.open_with_create(self.archive_db_dict, archive_schema) as archive:
archive.addRecord(genRecords())
# Now fetch them:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
with weewx.database.DBManager.open(self.archive_db_dict) as archive:
# Test getSql on existing type:
bar0 = archive.getSql("SELECT barometer FROM archive WHERE dateTime=?", (start_ts,))
self.assertEqual(bar0[0], barfunc(0))
@@ -193,7 +193,7 @@ class Common(unittest.TestCase):
self.assertEqual(_rec, None)
# Now try fetching them as vectors:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
with weewx.database.DBManager.open(self.archive_db_dict) as archive:
barvec = archive.getSqlVectors('barometer', start_ts, stop_ts)
# Recall that barvec will be a 3-way tuple. The first element is the vector of starting
# times, the second the vector of ending times, and the third the data vector.
@@ -204,7 +204,7 @@ class Common(unittest.TestCase):
# Start by setting up a generator function that will return the records to be
# included in each aggregation
gen = gen_included_recs(timevec, start_ts, stop_ts, 6*interval)
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
with weewx.database.DBManager.open(self.archive_db_dict) as archive:
barvec = archive.getSqlVectors('barometer', start_ts, stop_ts, aggregate_interval=6*interval, aggregate_type='avg')
n_expected = int(nrecs / 6)
self.assertEqual(n_expected, len(barvec[0][0]))

View File

@@ -1,288 +0,0 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2011, 2012 Tom Keffer <tkeffer@gmail.com>
#
# See the file LICENSE.txt for your full rights.
#
# $Revision$
# $Author$
# $Date$
#
"""Unit test module weewx.wxstats"""
from __future__ import with_statement
import os
import sys
import syslog
import time
import unittest
import math
import configobj
import weeutil.weeutil
import weewx.archive
import weewx.wxdaily
import weewx.tags
import gen_fake_data
day_keys = [x[0] for x in gen_fake_data.schema if x[0] not in ['dateTime', 'interval', 'usUnits']] + ['wind']
config_path = "testgen.conf"
cwd = None
class Common(unittest.TestCase):
def setUp(self):
global config_path
global cwd
weewx.debug = 1
syslog.openlog('test_stats', syslog.LOG_CONS)
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))
# Save and set the current working directory in case some service changes it.
if not cwd:
cwd = os.getcwd()
else:
os.chdir(cwd)
try :
config_dict = configobj.ConfigObj(config_path, file_error=True)
except IOError:
sys.stderr.write("Unable to open configuration file %s" % self.config_path)
# Reraise the exception (this will eventually cause the program to exit)
raise
except configobj.ConfigObjError:
sys.stderr.write("Error while parsing configuration file %s" % config_path)
raise
database_manager, self.database_dict = weewx.archive.prep_database(config_dict, self.binding)
# Get the class object of the manager to be used:
self.database_cls = weeutil.weeutil._get_object(database_manager)
# This will generate the test databases if necessary:
gen_fake_data.configDatabases(self.database_cls, self.database_dict)
def tearDown(self):
pass
def test_create_stats(self):
global day_keys
with self.database_cls.open(self.database_dict) as archive:
self.assertItemsEqual(sorted(archive.daykeys), sorted(day_keys))
self.assertEqual(archive.connection.columnsOf('day_barometer'),
['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count', 'wsum', 'sumtime'])
self.assertEqual(archive.connection.columnsOf('day_wind'),
['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count', 'wsum', 'sumtime',
'max_dir', 'xsum', 'ysum', 'dirsumtime', 'squaresum', 'wsquaresum'])
def testScalarTally(self):
with weewx.wxdaily.WXDaySummaryArchive.open(self.database_dict) as stats:
with weewx.archive.Archive.open(self.database_dict) as archive:
# Pick a random day, say 15 March:
start_ts = int(time.mktime((2010,3,15,0,0,0,0,0,-1)))
stop_ts = int(time.mktime((2010,3,16,0,0,0,0,0,-1)))
# Sanity check that this is truly the start of day:
self.assertEqual(start_ts, weeutil.weeutil.startOfDay(start_ts))
allStats = stats._get_day_summary(start_ts)
# Test it against some types
# Should also test monthly, yearly summaries
for stats_type in ['barometer', 'outTemp', 'rain']:
# Now test all the aggregates:
for aggregate in ['min', 'max', 'sum', 'count', 'avg']:
# Compare to the main archive:
res = archive.getSql("SELECT %s(%s) FROM archive WHERE dateTime>? AND dateTime <=?;" % (aggregate, stats_type), (start_ts, stop_ts))
# From StatsDb:
allStats_res = getattr(allStats[stats_type], aggregate)
self.assertAlmostEqual(allStats_res, res[0], msg="Value check. Failing type %s, aggregate: %s" % (stats_type, aggregate))
# Check the times of min and max as well:
if aggregate in ['min','max']:
res2 = archive.getSql("SELECT dateTime FROM archive WHERE %s = ? AND dateTime>? AND dateTime <=?" % (stats_type,), (res[0], start_ts, stop_ts))
stats_time = getattr(allStats[stats_type], aggregate+'time')
self.assertEqual(stats_time, res2[0], "Time check. Failing type %s, aggregate: %s" % (stats_type, aggregate))
def testWindTally(self):
with weewx.wxdaily.WXDaySummaryArchive.open(self.database_dict) as stats:
with weewx.archive.Archive.open(self.database_dict) as archive:
# Pick a random day, say 15 March:
start_ts = int(time.mktime((2010,3,15,0,0,0,0,0,-1)))
stop_ts = int(time.mktime((2010,3,16,0,0,0,0,0,-1)))
# Sanity check that this is truly the start of day:
self.assertEqual(start_ts, weeutil.weeutil.startOfDay(start_ts))
allStats = stats._get_day_summary(start_ts)
# Test all the aggregates:
for aggregate in ['min', 'max', 'sum', 'count', 'avg']:
if aggregate == 'max':
res = archive.getSql("SELECT MAX(windGust) FROM archive WHERE dateTime>? AND dateTime <=?;", (start_ts, stop_ts))
else:
res = archive.getSql("SELECT %s(windSpeed) FROM archive WHERE dateTime>? AND dateTime <=?;" % (aggregate, ), (start_ts, stop_ts))
# From StatsDb:
allStats_res = getattr(allStats['wind'], aggregate)
self.assertAlmostEqual(allStats_res, res[0])
# Check the times of min and max as well:
if aggregate == 'min':
resmin = archive.getSql("SELECT dateTime FROM archive WHERE windSpeed = ? AND dateTime>? AND dateTime <=?", (res[0], start_ts, stop_ts))
self.assertEqual(allStats['wind'].mintime, resmin[0])
elif aggregate == 'max':
resmax = archive.getSql("SELECT dateTime FROM archive WHERE windGust = ? AND dateTime>? AND dateTime <=?", (res[0], start_ts, stop_ts))
self.assertEqual(allStats['wind'].maxtime, resmax[0])
# Check RMS:
(squaresum, count) = archive.getSql("SELECT SUM(windSpeed*windSpeed), COUNT(windSpeed) from archive where dateTime>? AND dateTime<=?;", (start_ts, stop_ts))
rms = math.sqrt(squaresum/count) if count else None
self.assertAlmostEqual(allStats['wind'].rms, rms)
def testTags(self):
with weewx.wxdaily.WXDaySummaryArchive.open(self.database_dict) as stats:
with weewx.archive.Archive.open(self.database_dict) as archive:
spans = {'day' : weeutil.weeutil.TimeSpan(time.mktime((2010,3,15,0,0,0,0,0,-1)),
time.mktime((2010,3,16,0,0,0,0,0,-1))),
'week' : weeutil.weeutil.TimeSpan(time.mktime((2010,3,14,0,0,0,0,0,-1)),
time.mktime((2010,3,21,0,0,0,0,0,-1))),
'month': weeutil.weeutil.TimeSpan(time.mktime((2010,3,01,0,0,0,0,0,-1)),
time.mktime((2010,4,01,0,0,0,0,0,-1))),
'year' : weeutil.weeutil.TimeSpan(time.mktime((2010,1,01,0,0,0,0,0,-1)),
time.mktime((2011,1,01,0,0,0,0,0,-1)))}
# This may not necessarily execute in the order specified above:
for span in spans:
start_ts = spans[span].start
stop_ts = spans[span].stop
tagStats = weewx.tags.DatabaseBinder(stats, stop_ts,
rain_year_start=1,
heatbase=(65.0, 'degree_F', 'group_temperature'),
coolbase=(65.0, 'degree_F', 'group_temperature'))
# Cycle over the statistical types:
for stats_type in ('barometer', 'outTemp', 'rain'):
# Now test all the aggregates:
for aggregate in ('min', 'max', 'sum', 'count', 'avg'):
# Compare to the main archive:
res = archive.getSql("SELECT %s(%s) FROM archive WHERE dateTime>? AND dateTime <=?;" % (aggregate, stats_type), (start_ts, stop_ts))
archive_result = res[0]
# This is how you form a tag such as tagStats.month.barometer.avg when
# all you have is strings holding the attributes:
value_helper = getattr(getattr(getattr(tagStats, span), stats_type), aggregate)
self.assertAlmostEqual(float(str(value_helper.formatted)), archive_result, places=1)
# Check the times of min and max as well:
if aggregate in ('min','max'):
res2 = archive.getSql("SELECT dateTime FROM archive WHERE %s = ? AND dateTime>? AND dateTime <=?" % (stats_type,), (archive_result, start_ts, stop_ts))
stats_value_helper = getattr(getattr(getattr(tagStats, span), stats_type), aggregate +'time')
self.assertEqual(stats_value_helper.raw, res2[0])
self.assertEqual(str(tagStats.day.barometer.avg), "30.675 inHg")
self.assertEqual(str(tagStats.day.barometer.min), "30.065 inHg")
self.assertEqual(str(tagStats.day.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.day.barometer.mintime), "00:00")
self.assertEqual(str(tagStats.day.barometer.maxtime), "01:00")
self.assertEqual(str(tagStats.week.barometer.avg), "29.904 inHg")
self.assertEqual(str(tagStats.week.barometer.min), "29.000 inHg")
self.assertEqual(str(tagStats.week.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.week.barometer.mintime), "01:00 on Monday")
self.assertEqual(str(tagStats.week.barometer.maxtime), "01:00 on Wednesday")
self.assertEqual(str(tagStats.month.barometer.avg), "30.021 inHg")
self.assertEqual(str(tagStats.month.barometer.min), "29.000 inHg")
self.assertEqual(str(tagStats.month.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.month.barometer.mintime), "05-Mar-2010 00:00")
self.assertEqual(str(tagStats.month.barometer.maxtime), "03-Mar-2010 00:00")
self.assertEqual(str(tagStats.year.barometer.avg), "30.002 inHg")
self.assertEqual(str(tagStats.year.barometer.min), "29.000 inHg")
self.assertEqual(str(tagStats.year.barometer.max), "31.000 inHg")
self.assertEqual(str(tagStats.year.barometer.mintime), "04-Jan-2010 00:00")
self.assertEqual(str(tagStats.year.barometer.maxtime), "02-Jan-2010 00:00")
self.assertEqual(str(tagStats.day.outTemp.avg), "38.8°F")
self.assertEqual(str(tagStats.day.outTemp.min), "18.6°F")
self.assertEqual(str(tagStats.day.outTemp.max), "59.0°F")
self.assertEqual(str(tagStats.day.outTemp.mintime), "07:00")
self.assertEqual(str(tagStats.day.outTemp.maxtime), "19:00")
self.assertEqual(str(tagStats.week.outTemp.avg), "38.8°F")
self.assertEqual(str(tagStats.week.outTemp.min), "16.6°F")
self.assertEqual(str(tagStats.week.outTemp.max), "61.0°F")
self.assertEqual(str(tagStats.week.outTemp.mintime), "07:00 on Sunday")
self.assertEqual(str(tagStats.week.outTemp.maxtime), "19:00 on Saturday")
self.assertEqual(str(tagStats.month.outTemp.avg), "28.7°F")
self.assertEqual(str(tagStats.month.outTemp.min), "-0.9°F")
self.assertEqual(str(tagStats.month.outTemp.max), "59.0°F")
self.assertEqual(str(tagStats.month.outTemp.mintime), "01-Mar-2010 06:00")
self.assertEqual(str(tagStats.month.outTemp.maxtime), "31-Mar-2010 19:00")
self.assertEqual(str(tagStats.year.outTemp.avg), "40.0°F")
self.assertEqual(str(tagStats.year.outTemp.min), "-20.0°F")
self.assertEqual(str(tagStats.year.outTemp.max), "100.0°F")
self.assertEqual(str(tagStats.year.outTemp.mintime), "01-Jan-2010 06:00")
self.assertEqual(str(tagStats.year.outTemp.maxtime), "02-Jul-2010 19:00")
# Check the special aggregate types "exists" and "has_data":
self.assertTrue(tagStats.year.barometer.exists)
self.assertTrue(tagStats.year.barometer.has_data)
self.assertFalse(tagStats.year.bar.exists)
self.assertFalse(tagStats.year.bar.has_data)
self.assertTrue(tagStats.year.inHumidity.exists)
self.assertFalse(tagStats.year.inHumidity.has_data)
def test_rainYear(self):
with weewx.wxdaily.WXDaySummaryArchive.open(self.database_dict) as stats:
stop_ts = time.mktime((2011,1,01,0,0,0,0,0,-1))
# Check for a rain year starting 1-Jan
tagStats = weewx.tags.DatabaseBinder(stats, stop_ts,
rain_year_start=1)
self.assertEqual(str(tagStats.rainyear.rain.sum), "86.59 in")
# Do it again, for starting 1-Oct:
tagStats = weewx.tags.DatabaseBinder(stats, stop_ts,
rain_year_start=10)
self.assertEqual(str(tagStats.rainyear.rain.sum), "21.89 in")
def test_heatcool(self):
with weewx.wxdaily.WXDaySummaryArchive.open(self.database_dict) as stats:
#Test heating and cooling degree days:
stop_ts = time.mktime((2011,1,01,0,0,0,0,0,-1))
tagStats = weewx.tags.DatabaseBinder(stats, stop_ts,
heatbase=(65.0, 'degree_F', 'group_temperature'),
coolbase=(65.0, 'degree_F', 'group_temperature'))
self.assertEqual(str(tagStats.year.heatdeg.sum), "10150.7°F-day")
self.assertEqual(str(tagStats.year.cooldeg.sum), "1026.2°F-day")
class TestSqlite(Common):
def __init__(self, *args, **kwargs):
self.binding = "wx_sqlite"
super(TestSqlite, self).__init__(*args, **kwargs)
class TestMySQL(Common):
def __init__(self, *args, **kwargs):
self.binding = "wx_mysql"
super(TestMySQL, self).__init__(*args, **kwargs)
def suite():
tests = ['test_create_stats', 'testScalarTally', 'testWindTally',
'testTags', 'test_rainYear', 'test_heatcool']
# return unittest.TestSuite(map(TestSqlite, tests) + map(TestMySQL, tests))
return unittest.TestSuite(map(TestSqlite, tests) )
if __name__ == '__main__':
unittest.TextTestRunner(verbosity=2).run(suite())