mirror of
https://github.com/weewx/weewx.git
synced 2026-05-24 09:46:19 -04:00
Auto-patch daily summaries on startup.
This commit is contained in:
@@ -39,8 +39,10 @@ usage = """wee_database --help
|
||||
wee_database --drop-daily
|
||||
wee_database --rebuild-daily [--date=YYYY-mm-dd |
|
||||
[--from=YYYY-mm-dd] [--to=YYYY-mm-dd]]
|
||||
[--dry-run]
|
||||
wee_database --reweight [--date=YYYY-mm-dd |
|
||||
[--from=YYYY-mm-dd] [--to=YYYY-mm-dd]]
|
||||
[--dry-run]
|
||||
wee_database --calc-missing [--date=YYYY-mm-dd |
|
||||
[--from=YYYY-mm-dd[THH:MM]] [--to=YYYY-mm-dd[THH:MM]]]
|
||||
wee_database --check-strings
|
||||
@@ -348,9 +350,10 @@ def reweight(config_dict, db_binding, options):
|
||||
print("Recalculating the weighted summaries in database '%s' ..." % database_name)
|
||||
if options.dry_run:
|
||||
print("Dry run. Nothing done.")
|
||||
else:
|
||||
# Do the actual recalculations
|
||||
dbmanager.recalculate_weights(start_d=from_d, stop_d=to_d)
|
||||
|
||||
# Do the actual recalculations
|
||||
dbmanager.recalculate_weights(start_d=from_d, stop_d=to_d)
|
||||
msg = "Finished reweighting in %.1f seconds." % (time.time() - t1)
|
||||
log.info(msg)
|
||||
print(msg)
|
||||
@@ -583,7 +586,7 @@ def update(config_dict, db_binding, options):
|
||||
"""Apply any required database fixes.
|
||||
|
||||
Applies the following fixes:
|
||||
- checks if database version is 2.0, if not interval weighting fix is
|
||||
- checks if database version is 3.0, if not interval weighting fix is
|
||||
applied
|
||||
- recalculates windSpeed daily summary max and maxtime fields from
|
||||
archive
|
||||
@@ -601,36 +604,21 @@ def update(config_dict, db_binding, options):
|
||||
print(msg)
|
||||
return
|
||||
|
||||
if options.dry_run:
|
||||
logging.disable(logging.INFO)
|
||||
|
||||
msg = "Preparing Interval Weighting Fix..."
|
||||
log.info(msg)
|
||||
print(msg)
|
||||
|
||||
# notify if this is a dry run
|
||||
if options.dry_run:
|
||||
print("This is a dry run: weighted intervals will be calculated but not saved.")
|
||||
|
||||
# Get a database manager object
|
||||
dbm = weewx.manager.open_manager_with_config(config_dict, db_binding)
|
||||
|
||||
# Interval weighting
|
||||
# first construct an interval weighting config dict
|
||||
weighting_config_dict = {'name': 'Interval Weighting Fix',
|
||||
'binding': db_binding,
|
||||
'trans_days': 100,
|
||||
'dry_run': options.dry_run}
|
||||
|
||||
# check the daily summary version
|
||||
_daily_summary_version = dbm._read_metadata('Version')
|
||||
msg = "Daily summary tables are at version %s" % _daily_summary_version
|
||||
msg = "Daily summary tables are at version %s" % dbm.version
|
||||
log.info(msg)
|
||||
print(msg)
|
||||
|
||||
if _daily_summary_version is not None and _daily_summary_version >= '2.0':
|
||||
if dbm.version is not None and dbm.version >= '3.0':
|
||||
# interval weighting fix has been applied
|
||||
msg = "Interval Weighting Fix is not required."
|
||||
msg = "Interval weighting fix is not required."
|
||||
log.info(msg)
|
||||
print(msg)
|
||||
else:
|
||||
@@ -639,16 +627,16 @@ def update(config_dict, db_binding, options):
|
||||
log.info(msg)
|
||||
print(msg)
|
||||
t1 = time.time()
|
||||
dbm.recalculate_weights()
|
||||
if options.dry_run:
|
||||
print("Dry run. Nothing done")
|
||||
else:
|
||||
dbm.update()
|
||||
msg = "Interval Weighting Fix completed in %0.2f seconds." % (time.time() - t1)
|
||||
log.info(msg)
|
||||
print(msg)
|
||||
|
||||
# recalc the max/maxtime windSpeed values
|
||||
_fix_wind(config_dict, db_binding, options)
|
||||
# just in case, set the syslog level back where we found it
|
||||
if options.dry_run:
|
||||
logging.disable(logging.NOTSET)
|
||||
|
||||
|
||||
def calc_missing(config_dict, db_binding, options):
|
||||
|
||||
@@ -340,340 +340,10 @@ class WindSpeedRecalculation(DatabaseFix):
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# class IntervalWeighting
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class IntervalWeighting(DatabaseFix):
|
||||
"""Class to apply an interval based weight factor to the daily summaries.
|
||||
To apply the interval weight factor:
|
||||
|
||||
1. Create a dictionary of parameters required by the fix. The
|
||||
IntervalWeighting class uses the following parameters as indicated:
|
||||
|
||||
name: Name of the class defining the fix, for the interval
|
||||
weighting fix this is 'Interval Weighting'. String.
|
||||
Mandatory.
|
||||
|
||||
binding: The binding of the database to be fixed. Default is
|
||||
the binding specified in weewx.conf [StdArchive].
|
||||
String, eg 'binding_name'. Optional.
|
||||
|
||||
trans_days: Number of days to be fixed in each database
|
||||
transaction. Integer, default is 50. Optional.
|
||||
|
||||
dry_run: Process the fix as if it was being applied but do not
|
||||
write to the database. Boolean, default is True.
|
||||
Optional.
|
||||
|
||||
2. Create an IntervalWeighting object passing it a weewx config dict and a
|
||||
fix config dict.
|
||||
|
||||
3. Call the resulting object's run() method to apply the fix.
|
||||
"""
|
||||
|
||||
def __init__(self, config_dict, fix_config_dict):
|
||||
"""Initialise our IntervalWeighting object."""
|
||||
|
||||
# call our parents __init__
|
||||
super(IntervalWeighting, self).__init__(config_dict, fix_config_dict)
|
||||
|
||||
# Number of days per db transaction, default to 50.
|
||||
self.trans_days = int(fix_config_dict.get('trans_days', 50))
|
||||
|
||||
def run(self):
|
||||
"""Main entry point for applying the interval weighting fix.
|
||||
|
||||
Check archive records of unweighted days to see if each day of records
|
||||
has a unique interval value. If interval value is unique then apply the
|
||||
weighting. Catch any exceptions and raise as necessary. If any one day
|
||||
has multiple interval value then we cannot weight the daily summaries,
|
||||
instead rebuild the daily summaries.
|
||||
"""
|
||||
|
||||
# first do some logging about what we will do
|
||||
if self.dry_run:
|
||||
log.info("intervalweighting: This is a dry run. "
|
||||
"Interval weighting will be applied but not saved.")
|
||||
|
||||
log.info("intervalweighting: Using database binding '%s', "
|
||||
"which is bound to database '%s'." %
|
||||
(self.binding, self.dbm.database_name))
|
||||
log.debug("intervalweighting: Database transactions "
|
||||
"will use %s days of data." % self.trans_days)
|
||||
# Check metadata 'Version' value, if its greater than 1.0 we are
|
||||
# already weighted
|
||||
_daily_summary_version = self.dbm._read_metadata('Version')
|
||||
if _daily_summary_version is None or _daily_summary_version < '2.0':
|
||||
# Get the ts of the (start of the) next day to weight; it's the day
|
||||
# after the ts of the last successfully weighted daily summary
|
||||
_last_patched_ts = self.dbm._read_metadata('lastWeightPatch')
|
||||
if _last_patched_ts:
|
||||
_next_day_to_patch_dt = datetime.datetime.fromtimestamp(int(_last_patched_ts)) \
|
||||
+ datetime.timedelta(days=1)
|
||||
_next_day_to_patch_ts = time.mktime(_next_day_to_patch_dt.timetuple())
|
||||
else:
|
||||
_next_day_to_patch_ts = None
|
||||
# Check to see if any days that need to be weighted have multiple
|
||||
# distinct interval values
|
||||
if self.unique_day_interval(_next_day_to_patch_ts):
|
||||
# We have a homogeneous intervals for each day so we can weight
|
||||
# the daily summaries.
|
||||
|
||||
# Now apply the weighting but be prepared to catch any
|
||||
# exceptions
|
||||
try:
|
||||
self.do_fix(_next_day_to_patch_ts)
|
||||
# If we arrive here the fix was applied, if this is not
|
||||
# a dry run then set the 'Version' metadata field to
|
||||
# indicate we have updated to version 2.0.
|
||||
if not self.dry_run:
|
||||
with weedb.Transaction(self.dbm.connection) as _cursor:
|
||||
self.dbm._write_metadata('Version', '2.0', _cursor)
|
||||
except weewx.ViolatedPrecondition as e:
|
||||
log.info("intervalweighting: %s not applied: %s"
|
||||
% (self.name, e))
|
||||
# raise the error so caller can deal with it if they want
|
||||
raise
|
||||
else:
|
||||
# At least one day that needs to be weighted has multiple
|
||||
# distinct interval values. We cannot apply the weighting by
|
||||
# manipulating the existing daily summaries so we will weight
|
||||
# by rebuilding the daily summaries. Rebuild is destructive so
|
||||
# only do it if this is not a dry run
|
||||
if not self.dry_run:
|
||||
log.debug("intervalweighting: Multiple distinct 'interval' "
|
||||
"values found for at least one archive day.")
|
||||
log.info("intervalweighting: %s will be applied by dropping "
|
||||
"and rebuilding daily summaries." % self.name)
|
||||
self.dbm.drop_daily()
|
||||
self.dbm.close()
|
||||
# Reopen to force rebuilding of the schema
|
||||
self.dbm = weewx.manager.open_manager_with_config(self.config_dict,
|
||||
self.binding,
|
||||
initialize=True)
|
||||
# This will rebuild to a V2 daily summary
|
||||
self.dbm.backfill_day_summary()
|
||||
else:
|
||||
# daily summaries are already weighted
|
||||
log.info("intervalweighting: %s has already been applied." % self.name)
|
||||
|
||||
def do_fix(self, np_ts):
|
||||
"""Apply the interval weighting fix to the daily summaries."""
|
||||
|
||||
# do we need to weight? Only weight if next day to weight ts is None or
|
||||
# there are records in the archive from that day
|
||||
if np_ts is None or self.dbm.last_timestamp > np_ts:
|
||||
t1 = time.time()
|
||||
log.info("intervalweighting: Applying %s..." % self.name)
|
||||
_days = 0
|
||||
# Get the earliest daily summary ts and the obs that it came from
|
||||
first_ts, obs = self.first_summary()
|
||||
# Get the start and stop ts for our first transaction days
|
||||
_tr_start_ts = np_ts if np_ts is not None else first_ts
|
||||
_tr_stop_dt = datetime.datetime.fromtimestamp(_tr_start_ts) \
|
||||
+ datetime.timedelta(days=self.trans_days)
|
||||
_tr_stop_ts = time.mktime(_tr_stop_dt.timetuple())
|
||||
_tr_stop_ts = min(startOfDay(self.dbm.last_timestamp), _tr_stop_ts)
|
||||
last_start = None
|
||||
while True:
|
||||
with weedb.Transaction(self.dbm.connection) as _cursor:
|
||||
for _day_span in self.genSummaryDaySpans(_tr_start_ts, _tr_stop_ts, obs):
|
||||
# Get the weight to be applied for the day
|
||||
_weight = self.get_interval(_day_span) * 60
|
||||
# Get the current day stats in an accumulator
|
||||
_day_accum = self.dbm._get_day_summary(_day_span.start)
|
||||
# Set the unit system for the accumulator
|
||||
_day_accum.unit_system = self.dbm.std_unit_system
|
||||
# Weight the necessary accumulator stats, use a
|
||||
# try..except in case something goes wrong
|
||||
last_key = None
|
||||
try:
|
||||
for _day_key in self.dbm.daykeys:
|
||||
last_key = _day_key
|
||||
_day_accum[_day_key].wsum *= _weight
|
||||
_day_accum[_day_key].sumtime *= _weight
|
||||
# Do we have a vecstats accumulator?
|
||||
if hasattr(_day_accum[_day_key], 'wsquaresum'):
|
||||
# Yes, so update the weighted vector stats
|
||||
_day_accum[_day_key].wsquaresum *= _weight
|
||||
_day_accum[_day_key].xsum *= _weight
|
||||
_day_accum[_day_key].ysum *= _weight
|
||||
_day_accum[_day_key].dirsumtime *= _weight
|
||||
except Exception as e:
|
||||
# log the exception and re-raise it
|
||||
log.info("intervalweighting: Interval weighting of '%s' daily summary "
|
||||
"for %s failed: %s"
|
||||
% (last_key, timestamp_to_string(_day_span.start,
|
||||
format_str="%Y-%m-%d"), e))
|
||||
raise
|
||||
# Update the daily summary with the weighted accumulator
|
||||
if not self.dry_run:
|
||||
self.dbm._set_day_summary(_day_accum, None, _cursor)
|
||||
_days += 1
|
||||
# Save the ts of the weighted daily summary as the
|
||||
# 'lastWeightPatch' value in the archive_day__metadata
|
||||
# table
|
||||
if not self.dry_run:
|
||||
self.dbm._write_metadata('lastWeightPatch',
|
||||
_day_span.start,
|
||||
_cursor)
|
||||
# Give the user some information on progress
|
||||
if _days % 50 == 0:
|
||||
self._progress(_days, _day_span.start)
|
||||
last_start = _day_span.start
|
||||
|
||||
# Setup our next tranche
|
||||
# Have we reached the end, if so break to finish
|
||||
if _tr_stop_ts >= startOfDay(self.dbm.last_timestamp):
|
||||
break
|
||||
# More to process so set our start and stop for the next
|
||||
# transaction
|
||||
_tr_start_dt = datetime.datetime.fromtimestamp(_tr_stop_ts) \
|
||||
+ datetime.timedelta(days=1)
|
||||
_tr_start_ts = time.mktime(_tr_start_dt.timetuple())
|
||||
_tr_stop_dt = datetime.datetime.fromtimestamp(_tr_start_ts) \
|
||||
+ datetime.timedelta(days=self.trans_days)
|
||||
_tr_stop_ts = time.mktime(_tr_stop_dt.timetuple())
|
||||
_tr_stop_ts = min(self.dbm.last_timestamp, _tr_stop_ts)
|
||||
|
||||
# We have finished. Get rid of the no longer needed lastWeightPatch
|
||||
with weedb.Transaction(self.dbm.connection) as _cursor:
|
||||
_cursor.execute("DELETE FROM %s_day__metadata WHERE name=?"
|
||||
% self.dbm.table_name, ('lastWeightPatch',))
|
||||
|
||||
# Give the user some final information on progress,
|
||||
# mainly so the total tallies with the log
|
||||
self._progress(_days, last_start)
|
||||
print(file=sys.stdout)
|
||||
tdiff = time.time() - t1
|
||||
# We are done so log and inform the user
|
||||
log.info("intervalweighting: Calculated weighting "
|
||||
"for %s days in %0.2f seconds." % (_days, tdiff))
|
||||
if self.dry_run:
|
||||
log.info("intervalweighting: "
|
||||
"This was a dry run. %s was not applied." % self.name)
|
||||
else:
|
||||
# we didn't need to weight so inform the user
|
||||
log.info("intervalweighting: %s has already been applied." % self.name)
|
||||
|
||||
def get_interval(self, span):
|
||||
"""Return the interval field value used in a span.
|
||||
|
||||
Gets the interval field value from a TimeSpan of records. Raises a
|
||||
weewx.ViolatedPrecondition error if the interval field value could not
|
||||
be determined.
|
||||
|
||||
Input parameters:
|
||||
span: TimesSpan object of the period from which to determine
|
||||
the interval value.
|
||||
|
||||
Returns:
|
||||
The interval field value in minutes, if no interval field values
|
||||
are found then a weewx.ViolatedPrecondition error is raised.
|
||||
"""
|
||||
|
||||
_row = self.dbm.getSql(
|
||||
"SELECT `interval` FROM %s WHERE dateTime > ? AND dateTime <= ?;"
|
||||
% self.dbm.table_name, span)
|
||||
try:
|
||||
return _row[0]
|
||||
except IndexError:
|
||||
_msg = "'interval' field not found in archive day %s." % span
|
||||
raise weewx.ViolatedPrecondition(_msg)
|
||||
|
||||
def unique_day_interval(self, timestamp):
|
||||
"""Check a weewx archive for homogeneous interval values for each day.
|
||||
|
||||
An 'archive day' of records includes all records whose dateTime value
|
||||
is greater than midnight at the start of the day and less than or equal
|
||||
to midnight at the end of the day. Each 'archive day' of records is
|
||||
checked for multiple distinct interval values.
|
||||
|
||||
Input parameters:
|
||||
timestamp: check archive days containing timestamp and later. If
|
||||
None then all archive days are checked.
|
||||
|
||||
Returns:
|
||||
True if all checked archive days have a single distinct interval
|
||||
value. Otherwise returns False (ie if more than one distinct
|
||||
interval value is found in any one archive day).
|
||||
"""
|
||||
|
||||
t1 = time.time()
|
||||
log.debug("intervalweighting: Checking table '%s' for multiple "
|
||||
"'interval' values per day..." % self.dbm.table_name)
|
||||
start_ts = timestamp if timestamp else self.dbm.first_timestamp
|
||||
_days = 0
|
||||
_result = True
|
||||
for _day_span in weeutil.weeutil.genDaySpans(start_ts,
|
||||
self.dbm.last_timestamp):
|
||||
_row = self.dbm.getSql("SELECT MIN(`interval`),MAX(`interval`) FROM %s "
|
||||
"WHERE dateTime > ? AND dateTime <= ?;"
|
||||
% self.dbm.table_name, _day_span)
|
||||
try:
|
||||
# If MIN and MAX are the same then we only have 1 distinct
|
||||
# value. If the query returns nothing then that is fine too,
|
||||
# probably no archive data for that day.
|
||||
_result = _row[0] == _row[1] if _row else True
|
||||
except IndexError:
|
||||
# Something is seriously amiss, raise an error
|
||||
raise weewx.ViolatedPrecondition("Invalid 'interval' data "
|
||||
"detected in archive day %s." % _day_span)
|
||||
_days += 1
|
||||
if not _result:
|
||||
break
|
||||
if _result:
|
||||
log.debug("intervalweighting: Successfully checked %s days "
|
||||
"for multiple 'interval' values in %0.2f seconds."
|
||||
% (_days, (time.time() - t1)))
|
||||
return _result
|
||||
|
||||
def first_summary(self):
|
||||
"""Obtain the timestamp and observation name of the earliest daily
|
||||
summary entry.
|
||||
|
||||
It is possible the earliest dateTime value in the daily summary tables
|
||||
will be different from table to table. To find the earliest dateTime
|
||||
value we loop through each daily summary table finding the earliest
|
||||
dateTime value for each table and then take the earliest value of these.
|
||||
|
||||
Returns:
|
||||
A tuple of the form (timestamp, observation)
|
||||
|
||||
where:
|
||||
|
||||
timestamp: The earliest timestamp across all daily summary
|
||||
tables
|
||||
observation: The observation whose daily summary table has the
|
||||
earliest timestamp
|
||||
|
||||
(None, None) is returned if no dateTime values were found.
|
||||
"""
|
||||
|
||||
_res = (None, None)
|
||||
for _key in self.dbm.daykeys:
|
||||
_ts = self.first_summary_ts(_key)
|
||||
if _ts:
|
||||
_res = (weeutil.weeutil.min_with_none((_res[0], _ts)), _key)
|
||||
return _res
|
||||
|
||||
@staticmethod
|
||||
def _progress(ndays, last_time):
|
||||
"""Utility function to show our progress while processing the fix."""
|
||||
|
||||
print("Weighting daily summary: %d; Timestamp: %s\r"
|
||||
% (ndays, timestamp_to_string(last_time, format_str="%Y-%m-%d")),
|
||||
end='', file=sys.stdout)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# class CalcMissing
|
||||
# ============================================================================
|
||||
|
||||
|
||||
class CalcMissing(DatabaseFix):
|
||||
"""Class to calculate and store missing derived observations.
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ create user 'weewx'@'localhost' identified by 'weewx';
|
||||
create user 'weewx1'@'localhost' identified by 'weewx1';
|
||||
create user 'weewx2'@'localhost' identified by 'weewx2';
|
||||
grant select, update, create, delete, drop, insert on weewx.* to 'weewx'@'localhost';
|
||||
grant select, update, create, delete, drop, insert on weewx_scratch.* to 'weewx'@'localhost';
|
||||
grant select, update, create, delete, drop, insert on test.* to 'weewx'@'localhost';
|
||||
grant select, update, create, delete, drop, insert on test_weedb.* to 'weewx'@'localhost';
|
||||
grant select, update, create, delete, drop, insert on test_alt_weewx.* to 'weewx'@'localhost';
|
||||
|
||||
@@ -94,8 +94,9 @@ class Manager(object):
|
||||
# Try again:
|
||||
self.sqlkeys = self.connection.columnsOf(self.table_name)
|
||||
|
||||
# Set up cached data:
|
||||
self._sync()
|
||||
# Set up cached data. Make sure to call my version, not any subclass's version. This is
|
||||
# because the subclass has not been initialized yet.
|
||||
Manager._sync(self)
|
||||
|
||||
@classmethod
|
||||
def open(cls, database_dict, table_name='archive'):
|
||||
@@ -213,8 +214,8 @@ class Manager(object):
|
||||
log.info("Created and initialized table '%s' in database '%s'",
|
||||
self.table_name, self.database_name)
|
||||
|
||||
def _sync(self):
|
||||
"""Resynch the internal caches."""
|
||||
def _create_sync(self):
|
||||
"""Create the internal caches."""
|
||||
|
||||
# Fetch the first row in the database to determine the unit system in use. If the database
|
||||
# has never been used, then the unit system is still indeterminate --- set it to 'None'.
|
||||
@@ -225,6 +226,9 @@ class Manager(object):
|
||||
self.first_timestamp = self.firstGoodStamp()
|
||||
self.last_timestamp = self.lastGoodStamp()
|
||||
|
||||
def _sync(self):
|
||||
Manager._create_sync(self)
|
||||
|
||||
def lastGoodStamp(self):
|
||||
"""Retrieves the epoch time of the last good archive record.
|
||||
|
||||
@@ -278,7 +282,7 @@ class Manager(object):
|
||||
max_ts = max(max_ts, record['dateTime'])
|
||||
except (weedb.IntegrityError, weedb.OperationalError) as e:
|
||||
log.error("Unable to add record %s to database '%s': %s",
|
||||
weeutil.weeutil.timestamp_to_string(record['dateTime']),
|
||||
timestamp_to_string(record['dateTime']),
|
||||
self.database_name, e)
|
||||
|
||||
# Update the cached timestamps. This has to sit outside the transaction context,
|
||||
@@ -320,7 +324,7 @@ class Manager(object):
|
||||
sql_insert_stmt = "INSERT INTO %s (%s) VALUES (%s)" % (self.table_name, k_str, q_str)
|
||||
cursor.execute(sql_insert_stmt, value_list)
|
||||
log.info("Added record %s to database '%s'",
|
||||
weeutil.weeutil.timestamp_to_string(record['dateTime']),
|
||||
timestamp_to_string(record['dateTime']),
|
||||
self.database_name)
|
||||
|
||||
def _updateHiLo(self, accumulator, cursor):
|
||||
@@ -712,9 +716,9 @@ def show_progress(last_time, nrec=None):
|
||||
"""Utility function to show our progress"""
|
||||
if nrec:
|
||||
msg = "Records processed: %d; time: %s\r" \
|
||||
% (nrec, weeutil.weeutil.timestamp_to_string(last_time))
|
||||
% (nrec, timestamp_to_string(last_time))
|
||||
else:
|
||||
msg = "Processed through: %s\r" % weeutil.weeutil.timestamp_to_string(last_time)
|
||||
msg = "Processed through: %s\r" % timestamp_to_string(last_time)
|
||||
print(msg, end='', file=sys.stdout)
|
||||
sys.stdout.flush()
|
||||
|
||||
@@ -762,7 +766,7 @@ class DaySummaryManager(Manager):
|
||||
update.
|
||||
"""
|
||||
|
||||
version = "2.0"
|
||||
version = "3.0"
|
||||
|
||||
# Schemas used by the daily summaries:
|
||||
day_schemas = {
|
||||
@@ -821,6 +825,17 @@ class DaySummaryManager(Manager):
|
||||
# Database has not been initialized. Initialize it:
|
||||
self._initialize_day_tables(schema)
|
||||
|
||||
self.version = None
|
||||
self.daykeys = None
|
||||
DaySummaryManager._create_sync(self)
|
||||
self.patch_sums()
|
||||
|
||||
def close(self):
|
||||
self.version = None
|
||||
self.daykeys = None
|
||||
super(DaySummaryManager, self).close()
|
||||
|
||||
def _create_sync(self):
|
||||
# Get a list of all the observation types which have daily summaries
|
||||
all_tables = self.connection.tables()
|
||||
prefix = "%s_day_" % self.table_name
|
||||
@@ -828,15 +843,15 @@ class DaySummaryManager(Manager):
|
||||
meta_name = '%s_day__metadata' % self.table_name
|
||||
self.daykeys = [x[n_prefix:] for x in all_tables
|
||||
if (x.startswith(prefix) and x != meta_name)]
|
||||
|
||||
self.version = self._read_metadata('Version')
|
||||
if self.version is None:
|
||||
self.version = '1.0'
|
||||
log.debug('Daily summary version is %s', self.version)
|
||||
|
||||
def close(self):
|
||||
self.version = None
|
||||
self.daykeys = None
|
||||
super(DaySummaryManager, self).close()
|
||||
def _sync(self):
|
||||
super(DaySummaryManager, self)._sync()
|
||||
self._create_sync()
|
||||
|
||||
def _initialize_day_tables(self, schema):
|
||||
"""Initialize the tables needed for the daily summary."""
|
||||
@@ -902,7 +917,7 @@ class DaySummaryManager(Manager):
|
||||
_day_summary.addRecord(record, weight=_weight)
|
||||
self._set_day_summary(_day_summary, record['dateTime'], cursor)
|
||||
log.info("Added record %s to daily summary in '%s'",
|
||||
weeutil.weeutil.timestamp_to_string(record['dateTime']),
|
||||
timestamp_to_string(record['dateTime']),
|
||||
self.database_name)
|
||||
|
||||
def _updateHiLo(self, accumulator, cursor):
|
||||
@@ -1091,6 +1106,26 @@ class DaySummaryManager(Manager):
|
||||
|
||||
return nrecs, ndays
|
||||
|
||||
def drop_daily(self):
|
||||
"""Drop the daily summaries."""
|
||||
|
||||
log.info("Dropping daily summary tables from '%s' ...", self.connection.database_name)
|
||||
try:
|
||||
_all_tables = self.connection.tables()
|
||||
with weedb.Transaction(self.connection) as _cursor:
|
||||
for _table_name in _all_tables:
|
||||
if _table_name.startswith('%s_day_' % self.table_name):
|
||||
_cursor.execute("DROP TABLE %s" % _table_name)
|
||||
|
||||
self.daykeys = None
|
||||
except weedb.OperationalError as e:
|
||||
log.error("Drop daily summary tables failed for database '%s': %s",
|
||||
self.connection.database_name, e)
|
||||
raise
|
||||
else:
|
||||
log.info("Dropped daily summary tables from database '%s'",
|
||||
self.connection.database_name)
|
||||
|
||||
def recalculate_weights(self, start_d=None, stop_d=None,
|
||||
tranche_size=100, progress_fn=show_progress):
|
||||
"""Recalculate just the daily summary weights.
|
||||
@@ -1292,6 +1327,32 @@ class DaySummaryManager(Manager):
|
||||
'squaresum', 'wsquaresum']]))
|
||||
cursor.execute(wind_update, (start_ts,))
|
||||
|
||||
def patch_sums(self):
|
||||
"""Version 4.2 accidentally interpreted V2.0 daily sums as V1.0, so the weighted sums
|
||||
were all given a weight of 1.0, instead of the interval length. This fixes that."""
|
||||
if '1.0' < self.version < '3.0':
|
||||
msg = "Daily summaries at V%s. Patching to V%s" \
|
||||
% (self.version, DaySummaryManager.version)
|
||||
print(msg)
|
||||
log.info(msg)
|
||||
# We need to upgrade from V2.0 to V3.0. The only difference is that the
|
||||
# patch has been supplied to V3.0 daily summaries. The patch need only be
|
||||
# done from a date well before the V4.2 release. We pick 1-Jun-2020.
|
||||
self.recalculate_weights(start_d=datetime.date(2020,6,1))
|
||||
self._write_metadata('Version', DaySummaryManager.version)
|
||||
self.version = DaySummaryManager.version
|
||||
log.info("Patch finished.")
|
||||
|
||||
def update(self):
|
||||
"""Update the database to V3.0"""
|
||||
if self.version == '1.0':
|
||||
self.recalculate_weights()
|
||||
self._write_metadata('Version', DaySummaryManager.version)
|
||||
self.version = DaySummaryManager.version
|
||||
elif self.version == '2.0':
|
||||
self.patch_sums()
|
||||
|
||||
|
||||
# --------------------------- UTILITY FUNCTIONS -----------------------------------
|
||||
|
||||
def get_first_last(self):
|
||||
@@ -1425,26 +1486,6 @@ class DaySummaryManager(Manager):
|
||||
if cursor is None:
|
||||
_cursor.close()
|
||||
|
||||
def drop_daily(self):
|
||||
"""Drop the daily summaries."""
|
||||
|
||||
log.info("Dropping daily summary tables from '%s' ...", self.connection.database_name)
|
||||
try:
|
||||
_all_tables = self.connection.tables()
|
||||
with weedb.Transaction(self.connection) as _cursor:
|
||||
for _table_name in _all_tables:
|
||||
if _table_name.startswith('%s_day_' % self.table_name):
|
||||
_cursor.execute("DROP TABLE %s" % _table_name)
|
||||
|
||||
self.daykeys = None
|
||||
except weedb.OperationalError as e:
|
||||
log.error("Drop daily summary tables failed for database '%s': %s",
|
||||
self.connection.database_name, e)
|
||||
raise
|
||||
else:
|
||||
log.info("Dropped daily summary tables from database '%s'",
|
||||
self.connection.database_name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import doctest
|
||||
|
||||
76
bin/weewx/tests/test_manager.py
Normal file
76
bin/weewx/tests/test_manager.py
Normal file
@@ -0,0 +1,76 @@
|
||||
#
|
||||
# Copyright (c) 2009-2020 Tom Keffer <tkeffer@gmail.com>
|
||||
#
|
||||
# See the file LICENSE.txt for your full rights.
|
||||
#
|
||||
"""Test the daily sums in the daily summary.
|
||||
|
||||
Ordinarily, these are tested in test_daily. However, gen_fake_data() speeds things up by inserting
|
||||
a bunch of records in the archive table *then* building the daily summary. This does not test
|
||||
adding things on the fly.
|
||||
"""
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
import unittest
|
||||
|
||||
import gen_fake_data
|
||||
import weeutil.logger
|
||||
import schemas.wview_small
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
weeutil.logger.setup('test_manager', {})
|
||||
os.environ['TZ'] = 'America/Los_Angeles'
|
||||
time.tzset()
|
||||
|
||||
import weewx.manager
|
||||
|
||||
# Things go *much* faster if we use an abbreviated schema
|
||||
schema = schemas.wview_small.schema
|
||||
|
||||
# Archive interval of one hour
|
||||
interval = 3600
|
||||
# Fifteen days worth of data.
|
||||
start_dt = datetime.date(2019, 6, 1)
|
||||
stop_dt = datetime.date(2019, 6, 15)
|
||||
start_ts = int(time.mktime(start_dt.timetuple())) + interval
|
||||
stop_ts = int(time.mktime(stop_dt.timetuple()))
|
||||
|
||||
|
||||
class TestWeights(unittest.TestCase):
|
||||
"""Test that inserting records get the weighted sums right. Regression test for issue #623.
|
||||
SQLite only --- it is so much faster.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up an in-memory sqlite database."""
|
||||
self.db_manager = weewx.manager.DaySummaryManager.open_with_create(
|
||||
{
|
||||
'database_name': ':memory:',
|
||||
'driver': 'weedb.sqlite'
|
||||
},
|
||||
schema=schema)
|
||||
|
||||
# Populate the database
|
||||
for record in gen_fake_data.genFakeRecords(start_ts, stop_ts, interval=interval):
|
||||
self.db_manager.addRecord(record)
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
def test_weights(self):
|
||||
for key in self.db_manager.daykeys:
|
||||
archive_key = key if key != 'wind' else 'windSpeed'
|
||||
result1 = self.db_manager.getSql("SELECT COUNT(%s) FROM archive" % archive_key)
|
||||
result2 = self.db_manager.getSql("SELECT SUM(count) FROM archive_day_%s;" % key)
|
||||
self.assertEqual(result1, result2)
|
||||
result3 = self.db_manager.getSql("SELECT COUNT(%s) * %d FROM archive"
|
||||
% (archive_key, interval))
|
||||
result4 = self.db_manager.getSql("SELECT SUM(sumtime) FROM archive_day_%s" % key)
|
||||
self.assertEqual(result3, result4)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -3,6 +3,10 @@ weewx change history
|
||||
|
||||
4.2.1 MM/DD/YYYY
|
||||
|
||||
Version 4.2.0 had a bug, which caused the sums in the daily summary to be
|
||||
incorrectly calculated. This version includes a patch to fix the problem. It
|
||||
runs once at startup. Fixes issue #623.
|
||||
|
||||
Service weewx.wxxtypes.StdDelta was inadvertently left out of the list of
|
||||
services to be run. Fortunately, it is not used. Yet. Added it back in.
|
||||
|
||||
@@ -13,8 +17,6 @@ last windDir value.
|
||||
Fixed problem when looking up stars with more than one word in their name.
|
||||
Fixes issue #620.
|
||||
|
||||
Fixed problem that caused daily summary weights to revert to V1.0 style weights.
|
||||
|
||||
|
||||
4.2.0 10/26/2020
|
||||
|
||||
|
||||
Reference in New Issue
Block a user