better strategy for figuring out the stop time of an aggregation interval.

Updated test suite.
This commit is contained in:
Tom Keffer
2012-10-19 16:07:27 +00:00
parent 3e32925c64
commit 2b5ea6cef0
3 changed files with 68 additions and 21 deletions

View File

@@ -23,7 +23,13 @@ class Common(unittest.TestCase):
weedb.drop(self.db_dict)
except:
pass
def tearDown(self):
try:
weedb.drop(self.db_dict)
except:
pass
def populate_db(self):
weedb.create(self.db_dict)
self.assertRaises(weedb.DatabaseExists, weedb.create, self.db_dict)

View File

@@ -335,7 +335,7 @@ class Archive(object):
if aggregate_interval :
if not aggregate_type:
raise weewx.ViolatedPrecondition, "Aggregation type missing"
sql_str = 'SELECT %s(%s), usUnits FROM archive WHERE dateTime > ? AND dateTime <= ?' % (aggregate_type, sql_type)
sql_str = 'SELECT MAX(dateTime), %s(%s), usUnits FROM archive WHERE dateTime > ? AND dateTime <= ?' % (aggregate_type, sql_type)
for stamp in weeutil.weeutil.intervalgen(startstamp, stopstamp, aggregate_interval):
_cursor.execute(sql_str, stamp)
_rec = _cursor.fetchone()
@@ -343,12 +343,12 @@ class Archive(object):
# (signified by a null result)
if _rec:
if std_unit_system:
if std_unit_system != _rec[1]:
if std_unit_system != _rec[2]:
raise weewx.UnsupportedFeature, "Unit type cannot change within a time interval."
else:
std_unit_system = _rec[1]
time_vec.append(stamp[1]) # The timestamp on the right gets associated with the aggregation
data_vec.append(_rec[0])
std_unit_system = _rec[2]
time_vec.append(_rec[0])
data_vec.append(_rec[1])
else:
sql_str = 'SELECT dateTime, %s, usUnits FROM archive WHERE dateTime >= ? AND dateTime <= ?' % sql_type
for _rec in _cursor.execute(sql_str, (startstamp, stopstamp)):
@@ -433,7 +433,7 @@ class Archive(object):
raise weewx.ViolatedPrecondition, "Aggregation type missing or unknown"
# This SQL select string will select the proper wind types
sql_str = 'SELECT %s, usUnits FROM archive WHERE dateTime > ? AND dateTime <= ?' % windvec_types[ext_type]
sql_str = 'SELECT MAX(dateTime), %s, usUnits FROM archive WHERE dateTime > ? AND dateTime <= ?' % windvec_types[ext_type]
# Go through each aggregation interval, calculating the aggregation.
for stamp in weeutil.weeutil.intervalgen(startstamp, stopstamp, aggregate_interval):
@@ -443,7 +443,7 @@ class Archive(object):
_last_time = None
for _rec in _cursor.execute(sql_str, stamp):
(_mag, _dir) = _rec[0:2]
(_mag, _dir) = _rec[1:3]
if _mag is None:
continue
@@ -451,12 +451,12 @@ class Archive(object):
# A good direction is necessary unless the mag is zero:
if _mag == 0.0 or _dir is not None:
_count += 1
_last_time = stamp[1]
_last_time = _rec[0]
if std_unit_system:
if std_unit_system != _rec[2]:
if std_unit_system != _rec[3]:
raise weewx.UnsupportedFeature, "Unit type cannot change within a time interval."
else:
std_unit_system = _rec[2]
std_unit_system = _rec[3]
# Pick the kind of aggregation:
if aggregate_type == 'min':

View File

@@ -9,11 +9,13 @@
# $Date: 2012-10-11 16:55:54 -0700 (Thu, 11 Oct 2012) $
#
"""Test archive and stats database modules"""
from __future__ import with_statement
import unittest
import time
import weewx.archive
import weedb
import weeutil.weeutil
archive_sqlite = {'database': '/tmp/weedb.sdb', 'driver':'weedb.sqlite'}
archive_mysql = {'database': 'test_weedb', 'user':'weewx', 'password':'weewx', 'driver':'weedb.mysql'}
@@ -28,10 +30,10 @@ archive_schema = [('dateTime', 'INTEGER NOT NULL UNIQUE PRIMARY KEY'
std_unit_system = 1
interval = 3600 # One hour
nrecs = 12
nrecs = 48 # Two days
start_ts = int(time.mktime((2012, 07, 01, 00, 00, 0, 0, 0, -1))) # 1 July 2012
stop_ts = start_ts + interval * nrecs
last_ts = start_ts + interval * (nrecs-1)
stop_ts = start_ts + interval * (nrecs-1)
timevec = [start_ts+i*interval for i in range(nrecs)]
def timefunc(i):
return start_ts + i*interval
@@ -40,12 +42,24 @@ def barfunc(i):
def temperfunc(i):
return 68.0 + 0.1*i
def gen_included_recs(timevec, start_ts, stop_ts, agg_interval):
for stamp in weeutil.weeutil.intervalgen(start_ts, stop_ts, agg_interval):
included = []
for (irec, ts) in enumerate(timevec):
if stamp[0] < ts <= stamp[1]:
included.append(irec)
yield included
def genRecords():
for irec in range(nrecs):
_record = {'dateTime': timefunc(irec), 'interval': interval, 'usUnits' : 1,
'outTemp': temperfunc(irec), 'barometer': barfunc(irec), 'inTemp': 70.0 + 0.1*irec}
yield _record
for rec in genRecords():
print weeutil.weeutil.timestamp_to_string(rec['dateTime']), rec
time.sleep(0.5)
class Common(unittest.TestCase):
def setUp(self):
@@ -54,6 +68,12 @@ class Common(unittest.TestCase):
except:
pass
def tearDown(self):
try:
weedb.drop(self.archive_db_dict)
except:
pass
def test_no_archive(self):
# Attempt to open a non-existent database results in an exception:
self.assertRaises(weedb.OperationalError, weewx.archive.Archive.open, self.archive_db_dict)
@@ -86,7 +106,7 @@ class Common(unittest.TestCase):
# Now test to see what's in there:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
self.assertEqual(archive.firstGoodStamp(), start_ts)
self.assertEqual(archive.lastGoodStamp(), last_ts)
self.assertEqual(archive.lastGoodStamp(), stop_ts)
self.assertEqual(archive.std_unit_system, std_unit_system)
expected_iterator = genRecords()
@@ -105,7 +125,7 @@ class Common(unittest.TestCase):
archive.addRecord(existing_record)
# Test changing the unit system. It should raise a ValueError exception:
metric_record = {'dateTime': last_ts + interval, 'interval': interval, 'usUnits' : 16, 'outTemp': 20.0}
metric_record = {'dateTime': stop_ts + interval, 'interval': interval, 'usUnits' : 16, 'outTemp': 20.0}
self.assertRaises(ValueError, archive.addRecord, metric_record)
def test_get_records(self):
@@ -114,7 +134,7 @@ class Common(unittest.TestCase):
archive.addRecord(genRecords())
# Now fetch them:
with weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema) as archive:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
# Test getSql:
bar0 = archive.getSql("SELECT barometer FROM archive WHERE dateTime=?", (start_ts,))
self.assertEqual(bar0[0], barfunc(0))
@@ -123,11 +143,32 @@ class Common(unittest.TestCase):
for (irec,_row) in enumerate(archive.genSql("SELECT barometer FROM archive;")):
self.assertEqual(_row[0], barfunc(irec))
# Test getSqlVectors:
barvec = archive.getSqlVectors('barometer', start_ts, last_ts)
self.assertEqual(barvec[1], ([barfunc(irec) for irec in range(nrecs)], "inHg", "group_pressure"))
# Now try fetching them as vectors:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
barvec = archive.getSqlVectors('barometer', start_ts, stop_ts)
# Recall that barvec will be a 2-way tuple. The first element is the value tuple of
# the time vector, the second of the data vector.
self.assertEqual(barvec[0], ([timefunc(irec) for irec in range(nrecs)], "unix_epoch", "group_time"))
self.assertEqual(barvec[1], ([barfunc(irec) for irec in range(nrecs)], "inHg", "group_pressure"))
# Now try fetching the vectora gain, but using aggregation.
# Start by setting up a generator function that will return the records to be
# included in each aggregation
gen = gen_included_recs(timevec, start_ts, stop_ts, 6*interval)
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
barvec = archive.getSqlVectors('barometer', start_ts, stop_ts, aggregate_interval=6*interval, aggregate_type='avg')
n_expected = int(nrecs / 6)
self.assertEqual(n_expected, len(barvec[0][0]))
for irec in range(n_expected):
# Get the set of records to be included in this aggregation:
recs = gen.next()
# Make sure the timestamp of the aggregation interval is the same as the last
# record to be included in the aggregation:
self.assertEqual(timevec[max(recs)], barvec[0][0][irec])
# Calculate the expected average of the records included in the aggregation.
expected_avg = sum((barfunc(i) for i in recs)) / len(recs)
# Compare them.
self.assertAlmostEqual(expected_avg, barvec[1][0][irec])
class TestSqlite(Common):