Simplified initializers for StatsDb and Archive.

Working on test suites for the two.
This commit is contained in:
Tom Keffer
2012-10-18 01:40:30 +00:00
parent f90118e145
commit 36ffa28fa4
7 changed files with 197 additions and 206 deletions

View File

@@ -123,12 +123,3 @@ class Transaction(object):
except:
pass
#
# This is a utility function for converting a result set that might contain
# longs or decimal.Decimals (which MySQLdb uses) to something containing just ints.
#
import decimal
def massage(seq):
# Return the massaged sequence if it exists, otherwise, return None
if seq is not None:
return [int(i) if isinstance(i, long) or isinstance(i,decimal.Decimal) else i for i in seq]

View File

@@ -9,6 +9,8 @@
#
"""Driver for the MySQL database"""
import decimal
import MySQLdb
import _mysql_exceptions
@@ -169,7 +171,9 @@ class Cursor(object):
return self
def fetchone(self):
return self.cursor.fetchone()
# Get a result from the MySQL cursor, then run it through the massage
# filter below
return massage(self.cursor.fetchone())
def close(self):
try:
@@ -189,3 +193,12 @@ class Cursor(object):
if result is None:
raise StopIteration
return result
#
# This is a utility function for converting a result set that might contain
# longs or decimal.Decimals (which MySQLdb uses) to something containing just ints.
#
def massage(seq):
# Return the massaged sequence if it exists, otherwise, return None
if seq is not None:
return [int(i) if isinstance(i, long) or isinstance(i,decimal.Decimal) else i for i in seq]

View File

@@ -52,30 +52,31 @@ class Archive(object):
self.std_unit_system = _row[0] if _row is not None else None
@staticmethod
def fromDbDict(archive_db_dict):
def open(archive_db_dict):
"""Open an Archive database.
An exception of type weedb.OperationalError will be raised if the
database does not exist.
Returns:
An instance of Archive."""
connection = weedb.connect(archive_db_dict)
return Archive(connection)
@staticmethod
def fromConfigDict(config_dict):
archive_db = config_dict['StdArchive']['archive_database']
archive_db_dict = config_dict['Databases'][archive_db]
return Archive.fromDbDict(archive_db_dict)
@staticmethod
def open(archive_db_dict, archiveSchema=None):
def open_with_create(archive_db_dict, archiveSchema):
"""Open an Archive database, initializing it if necessary.
archive_db_dict: A database dictionary holding the information necessary
to open the database.
archiveSchema: The schema. If not provided, a default schema
will be used.
archiveSchema: The schema to be used
returns: An instance of Archive"""
Returns:
An instance of Archive"""
try:
archive = Archive.fromDbDict(archive_db_dict)
archive = Archive.open(archive_db_dict)
# The database exists and has been initialized. Return it.
return archive
except weedb.OperationalError:
@@ -88,11 +89,6 @@ class Archive(object):
except weedb.DatabaseExists:
pass
# Next, get the schema
if not archiveSchema:
import user.schemas
archiveSchema = user.schemas.defaultArchiveSchema
# List comprehension of the types, joined together with commas. Put
# the SQL type in backquotes, because at least one of them ('interval')
# is a MySQL reserved word
@@ -104,6 +100,7 @@ class Archive(object):
_cursor.execute("CREATE TABLE archive (%s);" % _sqltypestr)
except Exception, e:
_connect.close()
syslog.syslog(syslog.LOG_ERR, "archive: Unable to create database archive.")
syslog.syslog(syslog.LOG_ERR, "**** %s" % (e,))
raise
@@ -112,7 +109,6 @@ class Archive(object):
return Archive(_connect)
@property
def database(self):
return self.connection.database
@@ -120,6 +116,12 @@ class Archive(object):
def close(self):
self.connection.close()
def __enter__(self):
return self
def __exit__(self, etyp, einst, etb):
self.close()
def lastGoodStamp(self):
"""Retrieves the epoch time of the last good archive record.
@@ -218,7 +220,7 @@ class Archive(object):
_gen = _cursor.execute("SELECT * FROM archive WHERE dateTime > ? AND dateTime <= ?", (startstamp, stopstamp))
for _row in _gen :
yield dict(zip(self.sqlkeys, _row))
yield dict(zip(self.sqlkeys, _row)) if _row else None
finally:
_cursor.close()
@@ -538,58 +540,14 @@ class Archive(object):
column_list = self.connection.columnsOf('archive')
return column_list
#def config(db_dict, archiveSchema=None):
# """Configure a database for use with weewx. This will create the initial schema
# if necessary."""
#
# # Try to create the database. If it already exists, an exception will
# # be thrown.
# try:
# weedb.create(db_dict)
# except weedb.DatabaseExists:
# pass
#
# try:
# # Check to see if it has already been configured.
# _connect = weedb.connect(db_dict)
# if 'archive' in _connect.tables():
# return
#
# # If the user has not supplied a schema, use the default schema
# if not archiveSchema:
# import user.schemas
# archiveSchema = user.schemas.defaultArchiveSchema
#
# # List comprehension of the types, joined together with commas. Put
# # the SQL type in backquotes, because at least one of them ('interval')
# # is a MySQL reserved word
# _sqltypestr = ', '.join(["`%s` %s" % _type for _type in archiveSchema])
#
# with weedb.Transaction(_connect) as _cursor:
# _cursor.execute("CREATE TABLE archive (%s);" % _sqltypestr)
#
# except Exception, e:
# syslog.syslog(syslog.LOG_ERR, "archive: Unable to create database archive.")
# syslog.syslog(syslog.LOG_ERR, "**** %s" % (e,))
# raise
#
# finally:
# _connect.close()
#
# syslog.syslog(syslog.LOG_NOTICE, "archive: created schema for database 'archive'")
def reconfig(old_db_dict, new_db_dict, target_unit_system=None):
"""Copy over an old archive to a new one, using the new schema."""
config(new_db_dict)
oldArchive = Archive(old_db_dict)
newArchive = Archive(new_db_dict)
# Wrap the input generator in a unit converter.
record_generator = weewx.units.GenWithConvert(oldArchive.genBatchRecords(), target_unit_system)
with Archive.open(old_db_dict) as oldArchive:
with Archive.open_with_create(new_db_dict) as newArchive:
# This is very fast because it is done in a single transaction context:
newArchive.addRecord(record_generator)
newArchive.close()
oldArchive.close()
# Wrap the input generator in a unit converter.
record_generator = weewx.units.GenWithConvert(oldArchive.genBatchRecords(), target_unit_system)
# This is very fast because it is done in a single transaction context:
newArchive.addRecord(record_generator)

View File

@@ -151,30 +151,24 @@ class StatsDb(object):
self.std_unit_system = self._getStdUnitSystem()
@staticmethod
def fromDbDict(stats_db_dict):
def open(stats_db_dict):
connection = weedb.connect(stats_db_dict)
return StatsDb(connection)
@staticmethod
def fromConfigDict(config_dict):
stats_db = config_dict['StdArchive']['stats_database']
stats_db_dict = config_dict['Databases'][stats_db]
return StatsDb.fromDbDict(stats_db_dict)
@staticmethod
def open(stats_db_dict, stats_types=None):
def open_with_create(stats_db_dict, stats_types):
"""Open a StatsDb database, initializing it if necessary.
Does nothing if the database has already been initialized.
stats_types: an iterable collection with the names of the types for
which statistics will be gathered [optional. Default is to use all
possible types]"""
which statistics will be gathered.
Returns:
An instance of StatsDb"""
# If the database exists and has been initialized, then
# this will be successful. If not, an exception will be thrown.
try:
stats = StatsDb.fromDbDict(stats_db_dict)
stats = StatsDb.open(stats_db_dict)
# The database exists and has been initialized. Return it.
return stats
except weedb.OperationalError:
@@ -187,10 +181,6 @@ class StatsDb(object):
except weedb.DatabaseExists:
pass
# Next, get the schema.
if not stats_types:
import user.schemas
stats_types = user.schemas.defaultStatsTypes
# Heating and cooling degrees are not actually stored in the database:
final_stats_types = [s for s in stats_types if s not in ['heatdeg', 'cooldeg']]
@@ -207,6 +197,7 @@ class StatsDb(object):
_cursor.execute(meta_create_str)
_cursor.execute(meta_replace_str, ('unit_system', 'None'))
except Exception, e:
_connect.close()
syslog.syslog(syslog.LOG_ERR, "archive: Unable to create stats database.")
syslog.syslog(syslog.LOG_ERR, "**** %s" % (e,))
raise
@@ -222,6 +213,12 @@ class StatsDb(object):
def close(self):
self.connection.close()
def __enter__(self):
return self
def __exit__(self, etyp, einst, etb):
self.close()
def updateHiLo(self, accumulator):
"""Use the contents of an accumulator to update the highs/lows of a stats database."""
@@ -562,9 +559,7 @@ class StatsDb(object):
_cursor = self.connection.cursor()
try:
_cursor.execute(sqlStmt)
# Take the result set and run it through weedb.massage to convert
# any long's or decimal.Decimals to ints
return weedb.massage(_cursor.fetchone())
return _cursor.fetchone()
finally:
_cursor.close()
@@ -855,56 +850,3 @@ class StatsTypeHelper(object):
result = self.db.getAggregate(self.timespan, self.stats_type, aggregateType)
# Wrap the result in a ValueHelper:
return weewx.units.ValueHelper(result, self.context, self.formatter, self.converter)
#===============================================================================
# USEFUL FUNCTIONS
#===============================================================================
#def config(db_dict, stats_types=None):
# """Initialize the StatsDb database
#
# Does nothing if the database has already been initialized.
#
# stats_types: an iterable collection with the names of the types for
# which statistics will be gathered [optional. Default is to use all
# possible types]"""
# # Try to create the database. If it already exists, an exception will
# # be thrown.
# try:
# weedb.create(db_dict)
# except weedb.DatabaseExists:
# pass
#
# # Check to see if it has already been configured. If it has,
# # there will be some tables in it. We can just return.
# _connect = weedb.connect(db_dict)
# try:
# if _connect.tables():
# return
#
# # If no schema has been specified, use the default stats types:
# if not stats_types:
# import user.schemas
# stats_types = user.schemas.defaultStatsTypes
#
# # Heating and cooling degrees are not actually stored in the database:
# final_stats_types = filter(lambda x : x not in ['heatdeg', 'cooldeg'], stats_types)
#
# # Now create all the necessary tables as one transaction:
# with weedb.Transaction(_connect) as _cursor:
# for _stats_type in final_stats_types:
# # Slightly different SQL statement for wind
# if _stats_type == 'wind':
# _cursor.execute(wind_create_str)
# else:
# _cursor.execute(std_create_str % (_stats_type,))
# _cursor.execute(meta_create_str)
# _cursor.execute(meta_replace_str, ('unit_system', 'None'))
# except Exception, e:
# syslog.syslog(syslog.LOG_ERR, "archive: Unable to create stats database.")
# syslog.syslog(syslog.LOG_ERR, "**** %s" % (e,))
# raise
# finally:
# _connect.close()
#
# syslog.syslog(syslog.LOG_NOTICE, "stats: created schema for statistical database")

View File

@@ -9,64 +9,145 @@
# $Date: 2012-10-11 16:55:54 -0700 (Thu, 11 Oct 2012) $
#
"""Test archive and stats database modules"""
import syslog
import unittest
import configobj
import time
import weewx.archive
import weewx.stats
import weedb
class StatsTest(unittest.TestCase):
archive_sqlite = {'database': '/tmp/weedb.sdb', 'driver':'weedb.sqlite'}
stats_sqlite = {'database': '/tmp/stats.sdb', 'driver':'weedb.sqlite'}
archive_mysql = {'database': 'test_weedb', 'user':'weewx', 'password':'weewx', 'driver':'weedb.mysql'}
stats_mysql = {'database': 'test_stats', 'user':'weewx', 'password':'weewx', 'driver':'weedb.mysql'}
archive_schema = [('dateTime', 'INTEGER NOT NULL UNIQUE PRIMARY KEY'),
('usUnits', 'INTEGER NOT NULL'),
('interval', 'INTEGER NOT NULL'),
('barometer', 'REAL'),
('inTemp', 'REAL'),
('outTemp', 'REAL'),
('windSpeed', 'REAL')]
drop_list = ['dateTime', 'usUnits', 'interval', 'windSpeed', 'windDir', 'windGust', 'windGustDir']
stats_types = [_tuple[0] for _tuple in archive_schema if _tuple[0] not in drop_list] + ['wind']
std_unit_system = 1
interval = 300
nrecs = 20
start_ts = int(time.mktime((2012, 07, 01, 00, 00, 0, 0, 0, -1))) # 1 July 2012
stop_ts = start_ts + interval * nrecs
last_ts = start_ts + interval * (nrecs-1)
def genRecords():
for irec in range(nrecs):
_record = {'dateTime': start_ts + irec*interval, 'interval': interval, 'usUnits' : 1,
'outTemp': 68.0 + 0.1*irec, 'barometer': 30.0+0.01*irec, 'inTemp': 70.0 + 0.1*irec}
yield _record
class Common(unittest.TestCase):
def setUp(self):
global config_path
weewx.debug = 1
syslog.openlog('test_stats', syslog.LOG_CONS)
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))
try :
self.config_dict = configobj.ConfigObj(config_path, file_error=True)
except IOError:
sys.stderr.write("Unable to open configuration file %s" % self.config_path)
# Reraise the exception (this will eventually cause the program to exit)
raise
except configobj.ConfigObjError:
sys.stderr.write("Error while parsing configuration file %s" % config_path)
raise
def test_archive(self):
archive_db = self.config_dict['StdArchive']['archive_database']
archive_db_dict = self.config_dict['Databases'][archive_db]
# First, make sure it does not exist:
try:
weedb.drop(archive_db_dict)
except weedb.NoDatabase:
weedb.drop(self.archive_db_dict)
except:
pass
try:
weedb.drop(self.stats_db_dict)
except:
pass
def test_no_archive(self):
# Attempt to open a non-existent database results in an exception:
self.assertRaises(weedb.OperationalError, weewx.archive.Archive.open, self.archive_db_dict)
def test_no_stats(self):
# Attempt to open a non-existent database results in an exception:
self.assertRaises(weedb.OperationalError, weewx.stats.StatsDb.open, self.stats_db_dict)
def test_create_archive(self):
archive = weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema)
self.assertItemsEqual(archive.connection.tables(), ['archive'])
self.assertEqual(archive.connection.columnsOf('archive'), ['dateTime', 'usUnits', 'interval', 'barometer', 'inTemp', 'outTemp', 'windSpeed'])
archive.close()
# Now that the database exists, these should also succeed:
archive = weewx.archive.Archive.open(self.archive_db_dict)
self.assertItemsEqual(archive.connection.tables(), ['archive'])
self.assertEqual(archive.connection.columnsOf('archive'), ['dateTime', 'usUnits', 'interval', 'barometer', 'inTemp', 'outTemp', 'windSpeed'])
self.assertEqual(archive.sqlkeys, ['dateTime', 'usUnits', 'interval', 'barometer', 'inTemp', 'outTemp', 'windSpeed'])
self.assertEqual(archive.std_unit_system, None)
archive.close()
def test_create_stats(self):
stats = weewx.stats.StatsDb.open_with_create(self.stats_db_dict, stats_types)
self.assertItemsEqual(stats.connection.tables(), ['barometer', 'inTemp', 'outTemp', 'wind', 'metadata'])
self.assertEqual(stats.connection.columnsOf('barometer'), ['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count'])
self.assertEqual(stats.connection.columnsOf('wind'), ['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count', 'gustdir', 'xsum', 'ysum', 'squaresum', 'squarecount'])
stats.close()
# Now that the database exists, these should also succeed:
stats = weewx.stats.StatsDb.open(self.stats_db_dict)
self.assertItemsEqual(stats.connection.tables(), ['barometer', 'inTemp', 'outTemp', 'wind', 'metadata'])
self.assertEqual(stats.connection.columnsOf('barometer'), ['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count'])
self.assertEqual(stats.connection.columnsOf('wind'), ['dateTime', 'min', 'mintime', 'max', 'maxtime', 'sum', 'count', 'gustdir', 'xsum', 'ysum', 'squaresum', 'squarecount'])
stats.close()
def test_empty_archive(self):
archive = weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema)
self.assertEqual(archive.firstGoodStamp(), None)
self.assertEqual(archive.lastGoodStamp(), None)
self.assertEqual(archive.getRecord(123456789), None)
def test_add_archive_records(self):
# Test adding records using a 'with' statement:
with weewx.archive.Archive.open_with_create(self.archive_db_dict, archive_schema) as archive:
archive.addRecord(genRecords())
# Now test to see what's in there:
with weewx.archive.Archive.open(self.archive_db_dict) as archive:
self.assertEqual(archive.firstGoodStamp(), start_ts)
self.assertEqual(archive.lastGoodStamp(), last_ts)
self.assertEqual(archive.std_unit_system, std_unit_system)
expected_iterator = genRecords()
for _rec in archive.genBatchRecords():
try:
_expected_rec = expected_iterator.next()
except StopIteration:
break
# Check that the missing windSpeed is None, then remove it in order to do the compare:
self.assertEqual(_rec.pop('windSpeed'), None)
self.assertEqual(_expected_rec, _rec)
# Test adding an existing record. It should just quietly swallow it:
existing_record = {'dateTime': start_ts, 'interval': interval, 'usUnits' : 1, 'outTemp': 68.0}
archive.addRecord(existing_record)
# Test changing the unit system. It should raise a ValueError exception:
metric_record = {'dateTime': last_ts + interval, 'interval': interval, 'usUnits' : 16, 'outTemp': 20.0}
self.assertRaises(ValueError, archive.addRecord, metric_record)
class TestSqlite(Common):
def __init__(self, *args, **kwargs):
self.archive_db_dict = archive_sqlite
self.stats_db_dict = stats_sqlite
super(TestSqlite, self).__init__(*args, **kwargs)
class TestMySQL(Common):
# Now an effort to open it should result in an exception:
archive = weewx.archive.Archive.fromDbDict(archive_db_dict)
# def test_stats(self):
# stats_db = config_dict['StdArchive']['stats_database']
# stats_db_dict = config_dict['Databases'][stats_db]
def __init__(self, *args, **kwargs):
self.archive_db_dict = archive_mysql
self.stats_db_dict = stats_mysql
super(TestMySQL, self).__init__(*args, **kwargs)
def suite():
tests = ['test_no_archive', 'test_no_stats', 'test_create_archive', 'test_create_stats',
'test_empty_archive', 'test_add_archive_records']
return unittest.TestSuite(map(TestSqlite, tests) + map(TestMySQL, tests))
if __name__ == '__main__':
import sys
global config_path
if len(sys.argv) < 2 :
print "Usage: python test_dbs.py path-to-configuration-file"
exit()
# Get the path to the configuration file, then delete it from the argument list:
config_path = sys.argv[1]
del sys.argv[1:]
unittest.main()
unittest.TextTestRunner(verbosity=2).run(suite())

View File

@@ -204,7 +204,7 @@ class My_install_data(install_data):
# The default target conversion units should be 'US':
new_config['StdConvert']['target_unit'] = 'US'
# Check to see if there is an existing config file.
# If so, merge its contents with the new one
if os.path.exists(config_path):
@@ -234,6 +234,12 @@ class My_install_data(install_data):
# Add the version:
new_config['version'] = VERSION
# This is to fix a name change from the alpha and beta versions of v2.0:
new_config['Databases']['archive_sqlite']['driver'] = 'weedb.sqlite'
new_config['Databases']['stats_sqlite']['driver'] = 'weedb.sqlite'
new_config['Databases']['archive_mysql']['driver'] = 'weedb.mysql'
new_config['Databases']['stats_mysql']['driver'] = 'weedb.mysql'
# Get a temporary file:
tmpfile = tempfile.NamedTemporaryFile("w", 1)

View File

@@ -356,12 +356,12 @@ version = 2.0.0b3
[[archive_sqlite]]
root = %(WEEWX_ROOT)s
database = archive/weewx.sdb
driver = db.sqlite
driver = weedb.sqlite
[[stats_sqlite]]
root = %(WEEWX_ROOT)s
database = archive/stats.sdb
driver = db.sqlite
driver = weedb.sqlite
# MySQL databases require setting an appropriate 'user' and 'password'
[[archive_mysql]]
@@ -369,14 +369,14 @@ version = 2.0.0b3
user = weewx
password = weewx
database = weewx
driver = db.mysql
driver = weedb.mysql
[[stats_mysql]]
host = localhost
user = weewx
password = weewx
database = stats
driver = db.mysql
driver = weedb.mysql
############################################################################################