Simplified patching by moving metadata code to DaySummaryManager

This commit is contained in:
Tom Keffer
2017-01-17 12:56:07 -08:00
parent b5fa4321a2
commit 4acf75251d
3 changed files with 48 additions and 62 deletions

View File

@@ -12,7 +12,6 @@ manager.py:
partially patched daily summary is encountered.
- Pass appropriate weighting on to accumulators.
- Remove backfilling code.
- Add c
- Test partial rebuild of daily summaries on both sqlite and MySQL
wee_database:

View File

@@ -187,13 +187,13 @@ class IntervalWeighting(DatabasePatch):
# Check metadata 'Version' value, if its greater than 1.0 we are
# already patched
_daily_summary_version = self.read_metadata('Version')
if _daily_summary_version is None or _daily_summary_version < 2.0:
_daily_summary_version = self.dbm._read_metadata('Version')
if _daily_summary_version is None or _daily_summary_version < '2.0':
# Get the ts of the (start of the) next day to patch; it's the day
# after the ts of the last successfully patched daily summary
_last_patched_ts = self.read_metadata('lastWeightPatch')
_last_patched_ts = self.dbm._read_metadata('lastWeightPatch')
if _last_patched_ts:
_next_day_to_patch_dt = datetime.datetime.fromtimestamp(_last_patched_ts) + datetime.timedelta(days=1)
_next_day_to_patch_dt = datetime.datetime.fromtimestamp(int(_last_patched_ts)) + datetime.timedelta(days=1)
_next_day_to_patch_ts = time.mktime(_next_day_to_patch_dt.timetuple())
else:
_next_day_to_patch_ts = None
@@ -213,7 +213,7 @@ class IntervalWeighting(DatabasePatch):
# a dry run then set the 'Version' metadata field to
# indicate we have patched to version 2.0.
if not self.dry_run:
self.write_metadata('Version', '2.0')
self.dbm._write_metadata('Version', '2.0')
except weewx.ViolatedPrecondition, e:
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: **** %s" % e)
@@ -240,7 +240,7 @@ class IntervalWeighting(DatabasePatch):
self.dbm.backfill_day_summary()
# Set the 'Version' metadata field to indicate we have
# patched to version 2.0
self.write_metadata('Version', '2.0')
self.dbm._write_metadata('Version', '2.0')
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: Successfully applied '%s' patch." % self.name)
else:
@@ -308,7 +308,7 @@ class IntervalWeighting(DatabasePatch):
# 'lastWeightPatch' value in the archive_day__metadata
# table
if not self.dry_run:
self.write_metadata('lastWeightPatch',
self.dbm._write_metadata('lastWeightPatch',
_day_span.start,
_cursor)
# Give the user some information on progress
@@ -472,34 +472,6 @@ class IntervalWeighting(DatabasePatch):
_res = (min_with_none((_res[0], _row[0])), _key)
return _res
def read_metadata(self, metadata):
"""Obtain a metadata value from the archive_day__metadata table.
Returns:
Value of the metadata field. Returns None if no value was found.
"""
select_patch_str = """SELECT value FROM %s_day__metadata """\
"""WHERE name = '%s';"""
_row = self.dbm.getSql(select_patch_str % (self.dbm.table_name,
metadata))
return float(_row[0]) if _row else None
def write_metadata(self, metadata, value, cursor=None):
"""Write a value to a metadata field in the archive_day__metadata table.
Input parameters:
metadata: The name of the metadata field to be written to.
value: The value to be written to the metadata field.
"""
_cursor = cursor or self.dbm.connection.cursor()
meta_replace_str = """REPLACE INTO %s_day__metadata VALUES(?, ?)"""
_cursor.execute(meta_replace_str % self.dbm.table_name,
(metadata, value))
if cursor is None:
_cursor.close()
def do_vacuum(self):
"""Vacuum the database.

View File

@@ -365,7 +365,7 @@ class Manager(object):
self.connection.execute("UPDATE %s SET %s=? WHERE dateTime=?" %
(self.table_name, obs_type), (new_value, timestamp))
def getSql(self, sql, sqlargs=()):
def getSql(self, sql, sqlargs=(), cursor=None):
"""Executes an arbitrary SQL statement on the database.
sql: The SQL statement
@@ -374,12 +374,13 @@ class Manager(object):
returns: a tuple containing the results
"""
_cursor = self.connection.cursor()
_cursor = cursor or self.connection.cursor()
try:
_cursor.execute(sql, sqlargs)
return _cursor.fetchone()
finally:
_cursor.close()
if cursor is None:
_cursor.close()
def genSql(self, sql, sqlargs=()):
"""Generator function that executes an arbitrary SQL statement on
@@ -1056,9 +1057,8 @@ class DaySummaryManager(Manager):
"wsum REAL, sumtime INTEGER);"
meta_create_str = """CREATE TABLE %s_day__metadata (name CHAR(20) NOT NULL UNIQUE PRIMARY KEY, value TEXT);"""
meta_replace_str = """REPLACE INTO %s_day__metadata VALUES(?, ?)"""
select_update_str = """SELECT value FROM %s_day__metadata WHERE name = 'lastUpdate';"""
meta_replace_str = """REPLACE INTO %s_day__metadata VALUES(?, ?)"""
meta_select_str = """SELECT value FROM %s_day__metadata WHERE name=?"""
# Set of SQL statements to be used for calculating aggregate statistics. Key is the aggregation type.
sqlDict = {'min' : "SELECT MIN(min) FROM %(table_name)s_day_%(obs_key)s WHERE dateTime >= %(start)s AND dateTime < %(stop)s",
@@ -1138,7 +1138,7 @@ class DaySummaryManager(Manager):
# Create the meta table:
cursor.execute(DaySummaryManager.meta_create_str % self.table_name)
# Put the version number in it:
cursor.execute(DaySummaryManager.meta_replace_str % self.table_name, ("Version", DaySummaryManager.version))
self._write_metadata('Version', DaySummaryManager.version, cursor)
def _addSingleRecord(self, record, cursor, log_level):
"""Specialized version that updates the daily summaries, as well as the
@@ -1518,6 +1518,7 @@ class DaySummaryManager(Manager):
except weewx.accum.OutOfSpan:
# The record is out of the time span.
# Save the old accumulator:
# TODO: Is passing on setting the last update really necessary here?
self._set_day_summary(_day_accum, None, _cursor)
ndays += 1
# Get a new accumulator:
@@ -1537,8 +1538,7 @@ class DaySummaryManager(Manager):
ndays += 1
# Patch lastUpdate:
if _maxTime:
_cursor.execute(DaySummaryManager.meta_replace_str % self.table_name,
('lastUpdate', str(int(_maxTime))))
self.write_metadata('lastUpdate', str(int(_maxTime)), _cursor)
return (nrecs, ndays)
@@ -1609,29 +1609,44 @@ class DaySummaryManager(Manager):
# If requested, update the time of the last daily summary update:
if lastUpdate is not None:
cursor.execute(DaySummaryManager.meta_replace_str % self.table_name, ('lastUpdate', str(int(lastUpdate))))
self._write_metadata('lastUpdate', str(int(lastUpdate)), cursor)
def _read_metadata(self, key, cursor=None):
"""Obtain a value from the daily summary metadata table.
Returns:
Value of the metadata field. Returns None if no value was found.
"""
_row = self.getSql(DaySummaryManager.meta_select_str % self.table_name, (key,), cursor)
return _row[0] if _row else None
def _write_metadata(self, key, value, cursor=None):
"""Write a value to the daily summary metadata table.
Input parameters:
key: The name of the metadata field to be written to.
value: The value to be written to the metadata field.
"""
_cursor = cursor or self.connection.cursor()
try:
_cursor.execute(DaySummaryManager.meta_replace_str % self.table_name,
(key, value))
finally:
if cursor is None:
_cursor.close()
def _getLastUpdate(self, cursor=None):
"""Returns the time of the last update to the statistical database."""
if cursor:
cursor.execute(DaySummaryManager.select_update_str % self.table_name)
_row = cursor.fetchone()
else:
_row = self.getSql(DaySummaryManager.select_update_str % self.table_name)
return int(_row[0]) if _row else None
return int(self._read_metadata('lastUpdate', cursor))
def _getVersion(self):
def _getVersion(self, cursor=None):
"""Returns a string holding the current version number. Returns None if there is no version number.
"""
cursor = self.connection.cursor()
try:
cursor.execute("""SELECT value FROM %s_day__metadata WHERE name = 'Version';""" % self.table_name)
row = cursor.fetchone()
finally:
cursor.close()
return str(row[0]) if row is not None else None
return self._read_metadata('Version', cursor)
def drop_daily(self):
"""Drop the daily summaries."""