Files
weewx/bin/weecfg/database.py
Bill Richter 93436a927c once again try to update my fork (#2)
* Got test suites working again.

* Updated TODO

* indicate specific firmware for cc3000

* clarify rainwise firmware

* Further refinement

- max and maxtime now updated
- reworked the progress function, now a method in the class

Have left __main__ code in that has been used for testing windSpeed
recalculation fix

* indicate when calibrations are ignored

* Now emits the barometer trend in LOOP packets as field 'trendIcon'

Vantage only.

* Updated TODO guide, reflecting this morning's phone call.

* Presses on, despite database error

* Simplified patching by moving metadata code to DaySummaryManager

* Got rid of _getVersion()

Can now get it from _read_metadata

* Remove interval weighting fix from weewx startup

Refer TODO.txt

* Update TODO.txt

* added write timeout to ultimeter driver

* better logging when cc3000 loses contact with sensors

* document some of the channel, sensor, and logger specifics for the wmr100/wmr200 hardware

* Removed the p word, patch.py is now database.py, rejigged wee_database as per skype and followup emails

- progress function for fixes included as a method in base class that
can be overridden
- much changing of logging to give consistent results/output
- believe I retained Tom's recent changes to patch.py (commit 4acf752)

* Picks daily summary weights on the basis of database version

* Rewrote the backfill routine (again)

* wee_database now uses new version of backfill_day_summary

* Fixed problem that prevented cold startup

* Revised to reflect latest wee_database incantation

* Modified weighting fix

Should not mess with lessUpdate
Delete lastWeightPatch after successful patch

* Check to make sure the daily summaries have not been partially updated.

* Revised to reflect latest wee_database incantation

* Revised wee_database and interval weighting paras, added windSpeed recalc para

* Removed vacuum

* Updated what has been done and committed

* Better diagnostics with partial update of the daily summaries.

* MySQL now uses transaction isolation level READ COMMITTED

* Add .config (#204)

* Changed semantics of "$last_xxx" tags.

* Template test updates including fixing issue #201 (#205)

* Add .config

* Standardise test skin for index.html.tml to remove "%x %X" locale dependent formatting. Note: Used 24 hour time as AM/PM can also be locale dependent.

Also include fixes for line formatting in some other test files and expected results.

* sysctlbyname is not available on every platform, so catch AttributeError too, otherwise cheetah fails with a name error

* Got rid of tabs

* update usb mode info for acurite models

* Removed intervalgenRoundTS and archiveDaysAgoSpan.

* Simplified tags. Got template test working again.

* Somehow, style "indent" got lost

* Documented new "$ago" tags.

* Documented $ago options, as well as .start and .end

* Update changes doc

* Stupid typo

* use markdown instead of html for README

* indenting seems to confuse markdown's handling of urls

* Left over $last_day in $spans example (#206)

* Add .config

* Standardise test skin for index.html.tml to remove "%x %X" locale dependent formatting. Note: Used 24 hour time as AM/PM can also be locale dependent.

Also include fixes for line formatting in some other test files and expected results.

* Left over $last_day in $spans example.

* a user-specified sensor_map will update, not replace, the default sensor_map

* Updated the upgrade guide.

* ensure that weewx-multi works on deb and rpm as well as setup.py

* document the sensor mapping changes

* Added comprehensive example to customizing guide.

* Updated TODO

* Got the weighting update to work on MySQL

* added crude caching of pressure in wmr300

* Fixed typo

* Fixed little errors. Consistently use "tag", instead of "dot code."

* no altimeter, just pressure and barometer

* Fixed little HTML problems.

* Reformatted, in anticipation of a refactoring of some sections.

* Fixed location of 'Version'

So it works with automatic replacement of 'Version'

* initial experiment with different fonts

* override jquery-ui hijacking of toc font family

* bring header highlighting to h2 to correlate with toc.  reduce post-header margins.

* fix some table header cruft.  prevent indent sections from overflowing right margin.  let the table cells breath.

* fix broken tty formatting

* Checkpoint

Work in progress.

* Refined examples of creating new units and groups

* Polishing. Or, maybe fiddling?

* minor css fixes.  rearrange troubleshooting sections about pressure.

* minor cleanup to readme

* Fixed test suites

Add MySQL back to template tests.

* Reworked the iteration examples.

* Corrected and clarified the units used in the "electricity" example.

* remove write_timeout since naming is inconsistent between pyserial versions and there is no backward compatibility

* remove write_timeout from ultimeter

* Minor changes to users guide

* increase body size to 100%.  background for code.  weeWX in titles.  true bold for monospace.  prep for direct font comparisons.

* Clarified the role of encoding

* update utilities guide with suggested wee_database descriptions and args

* more compaction

* increase margins on html examples.  use droid serif for html examples.

* Clarified a few things

* decode weewx into weeWX

* include transaction limit defaults

* eliminate transaction-limit

* weeWX fixes in install pages.  more fixes to utilities.

* missed a few code spans

* fix version label alignment

* use only major.minor for docs

* Fix error where import fields that are None can cause Source.mapRawData() to crash in some cases

* Updated TODO and NEW_FEATURES

* There will be no daykeys if the daily summaries have been dropped.

* Restructure usage string, hard code transaction days

* dry-run goes with fix-strings, not check-strings

* Log daily summary version

* No need to check for weewx.debug

* include examples in manifest

* added examples and extensions to data_files

* do not install sample extensions

* Can now specify date field separator for Cumulus imports, weewx -> weeWX

* Clarified option strings

* Rudimentary test of selective daily summary rebuild

* No longer allows selective rebuild of the daily summaries

if the summaries are not complete

* Hardwired UTF-8 encoding, but with a warning comment.

* Hardwired UTF-8 encoding, but with a warning comment.

* Documented Cumulus import separator config option

* Added comments about Tools.

* Changed to execute in user environment

* Ported to PyMySQL as an alternative to python-mysqldb

See https://github.com/PyMySQL/PyMySQL for a description of PyMySQL

* Recognize additional MySQL "Can't connect" error

* Fixed error in test suites

Subsequent tests depended on ordering of a dictionary.

* pymysql seems to have problems connecting via file socket

unless it is told explicitly about it.

* Workaround for pypy compiler

* Defaults now support MySQLdb over pymysql

* Fixed bug in record augmentation.

The augmentation was happening without giving StdConvert a chance to do
its thing.

* More clear msg when encountering an ImportError

* Clarified the relationship between archive period and report_timing option, aded note regarding primacy of the report cycle

* Reverted back to MySQLdb only version.

* Changed config option names but never changed the code!

* One transaction for updating daily high/lows and archive record

Formerly, these were done in two transactions.

* v3.7.0a2

* Adjustable value for how long to wait after a bad uploader login.

Option retry_login. Fixes issue #212.

* Fixed Cumulus import rain field issue

wee_import will try to use field 26(AA) - midnight reset daily rainfall
but if not available due it will revert to field 9(J) or 11(L)

* Switched back to __str__ when extracting string out of template.

.respond() doesn't seem to encode Unicode characters properly

* Fix errors in wee_import WU step-by-step, remove Cumulus version caveat on rain

* Now uses dedicated test users 'weewx1' and 'weewx2'

* Formal check of the various MySQLdb exceptions.

* Added sqlite3 exceptions.

* Reworked check_strings() output (#213)

Reworked check_strings() screen and syslog output:
- now gives progress ala --rebuild-daily
- syslog is silent for --check-strings and --fix-strings with --dry-run
- left 'Preparing' (rather than 'Starting') but added 'this may take a
while' as there is a significant delay in dbmanager.genBatchRows()
initialising at line 619 (well there was for 400k records)

* reduce debug log spewage in wmr300 driver

* Finished formal test of errors

* First cut at V3.7 exception hierarchy

* Ported the weedb sqlite driver to the new exception hierarchy.

* Ported MySQLdb to the new database exception hierarchy.

* windSpeed fix now gracefully handles no windSpeed summary table, tweaked --update output/logging

* Now picks up absence of windSpeed daily summary

* The weedb Connection object can now be used in a "with" clause.

* The weedb Cursor object can now be used in a "with" clause.

* V3.7.0a3

* more code removal

* code formatting only

* use apt instructions for debian installs

* Reworked --wee-database section of Utilities Guide to reflect current wee-database operation

- revised usage
- reword --rebuild-daily
- reword --check
- reword --update

* More details on upgrading

* Moved start time to just before applying the patch

* Accumulator is now initialized with override values from weewx.conf

* Added sentences about wee_import/interval and weight patching multiple dbs

* remove extensions from rpm and deb packages

* v3.7.0b1

* Cleaned up some HTML warnings.

* must do a try loop at the read level so we can skip the no data 'errors' and return empty buffer so that a subsequent write will get the station to talk again

* Slightly more robust mechanism for decoding last time a file was FTP'd.

* adjust wording of weighting description

* added examples

* simplify

* fixed typo

* fixed typo

* read /etc/default before bailing out

* make init script work properly with /etc/default/weewx

* make output consistent

* more simplification

* keep PEP happy

* avoid resource consumption from slow reports by extending the StdReport.max_wait.  provide log messages when it happens.

* new features have move to the roadmap

* no more todo items left

* do not emit default sensor_map to the config

* aborted attempt to get additional battery status

* bump to 3.7.0b2

* do recipe using wget instead of curl

* forgot the O option to wget

* do not warn when calibrations are ignored - the implementation resulted in too many log entries

* get rid of tabs

* added notes about wmr300 rain counter and logger

* added norwegian 'no data available' as 'Ingen data er tilgjengelige'

* simplify.  eliminate more passive voice and gerunds.

* avoid run-on

* provide better feedback for operational errors.  make manager logging more consistent.

* fix typo in wmr300 ConfEditor

* decode heatindex and windchill from wmr300 sensor outputs

* fixed bad extract_signed invocation

* fixed wmr* partial packets note

* added mysqldb install instructions to userguide

* minor html fixes

* added link to wee_extension

* clarify acurite sensor transmission periods.

* added battery status for all wmr100 remote t/h sensors

* added battery status for all wmr100 remote t sensors

* document changes to wmr100, wmr200, wmr9x8 drivers.  fix 'Calculatios' typo.

* fixed inverted wmr200 battery status

* rename fault_out to out_fault to match pattern of other faults

* make battery status labels consistent across all wmr drivers

* wmr300 driver moves from rc to 0.18

* bump to 3.7.0b3

* css fixes: neutralize the glaring yellow; brighten the note green to more closely match the tone of warning red; @media tweaks to match font changes.

* fix column title

* distinguish selection color from code color

* fixed shift bug in weewx-multi

* Fixed (I think) issue #219

* update logwatch script to properly handle revised generator log messages

refer commit
03c3e4ef57 (diff-3cefdd7265f340e9683b0a2d0417b70f)

* normalize the quick-start

* Merge branch 'development', remote branch 'origin'

* fix layout table width on installation pages

* v3.7.0b4

* use released_versions instead of previous_versions

* Merge branch 'development', remote branch 'origin'

* parameterize release rule.  make release rule idempotent.

* replace cheetahtemplate.org with pythonhosted.org

* wee_database --help output was slightly different to reality

* Removed BOM at beginning of customizing.htm

* wee_database --help output was slightly different to reality

* Reworded comment on whether to --update daily summaries.

* cater for change in manager log output

* remove misleading windGustDir info

* bump to 3.7.0

* Added date to change log

* escape the dollars in release target

* fixed log syntax

* adjust log level for wmr100 bad usb report

* emit rapidfire cache info only when debug >= 3

* enable post_interval overrides for WOW uploader

* Fixes issue #230, exception when using Rapidfire with metric units

* Added StdRESTbase back in.

It seems that some uploaders still depend on it.

* Fixed problem that prevented a MySQL port from being specified.

* Added antialias GIF to list of files to be installed.

* Make sure GIF files get uploaded

* distribute examples in a single directory

* distinguish docs/examples vs examples

* Fixed bug that prevented a port from being specified for MySQL installations.

* Removed redundant change log entry

* Add MySQL Error 2003 to exceptions (#234)

* Added PR #234 to change log

* Documented change in location of the examples

* update examples paths in remaining guides.  explicitly list all path changes for examples.

* By default, autocommit is now enabled for the MySQL driver. Fixes issue #237.

Included regression test.
2017-03-20 00:41:34 -07:00

683 lines
31 KiB
Python

#
# Copyright (c) 2009-2017 Tom Keffer <tkeffer@gmail.com> and
# Gary Roderick <gjroderick@gmail.com>
#
# See the file LICENSE.txt for your full rights.
#
"""Classes to support fixes or other bulk corrections of weewx data."""
from __future__ import with_statement
# standard python imports
import datetime
import sys
import syslog
import time
# weewx imports
import weedb
import weeutil.weeutil
import weewx.manager
from weeutil.weeutil import timestamp_to_string, startOfDay, tobool
# ============================================================================
# class DatabaseFix
# ============================================================================
class DatabaseFix(object):
"""Base class for fixing bulk data in the weewx database.
Classes for applying different fixes the weewx database data should be
derived from this class. Derived classes require:
run() method: The entry point to apply the fix.
fix config dict: Dictionary containing config data specific to
the fix. Minimum fields required are:
name. The name of the fix. String.
"""
def __init__(self, config_dict, fix_config_dict):
"""A generic initialisation."""
# save our weewx config dict
self.config_dict = config_dict
# save our fix config dict
self.fix_config_dict = fix_config_dict
# get our name
self.name = fix_config_dict['name']
# is this a dry run
self.dry_run = tobool(fix_config_dict.get('dry_run', True))
def run(self):
raise NotImplementedError("Method 'run' not implemented")
def genSummaryDaySpans(self, start_ts, stop_ts, obs='outTemp'):
"""Generator to generate a sequence of daily summary day TimeSpans.
Given an observation that has a daily summary table, generate a
sequence of TimeSpan objects for each row in the daily summary table.
In this way the generated sequence includes only rows included in the
daily summary rather than any 'missing' rows.
Input parameters:
start_ts: Include daily summary rows with a dateTime >= start_ts
stop_ts: Include daily summary rows with a dateTime <>= start_ts
obs: The weewx observation whose daily summary table is to be
used as the source of the TimeSpan objects
Returns:
A sequence of day TimeSpan objects
"""
_sql = "SELECT dateTime FROM %s_day_%s "\
" WHERE dateTime >= ? AND dateTime <= ?" % (self.dbm.table_name, obs)
_cursor = self.dbm.connection.cursor()
try:
for _row in _cursor.execute(_sql, (start_ts, stop_ts)):
yield weeutil.weeutil.archiveDaySpan(_row[0], grace=0)
finally:
_cursor.close()
def first_summary_ts(self, obs_type):
"""Obtain the timestamp of the earliest daily summary entry for an
observation type.
Imput:
obs_type: The observation type whose daily summary is to be checked.
Returns:
The timestamp of the earliest daily summary entry for obs_tpye
observation. None is returned if no record culd be found.
"""
_sql_str = "SELECT MIN(dateTime) FROM %s_day_%s" % (self.dbm.table_name,
obs_type)
_row = self.dbm.getSql(_sql_str)
if _row:
return _row[0]
return None
@staticmethod
def _progress(record, ts):
"""Utility function to show our progress while processing the fix.
Override in derived class to provide a different progress display.
To do nothing override with a pass statement.
"""
print >>sys.stdout, "Fixing database record: %d; Timestamp: %s\r" % \
(record, timestamp_to_string(ts)),
sys.stdout.flush()
# ============================================================================
# class WindSpeedRecalculation
# ============================================================================
class WindSpeedRecalculation(DatabaseFix):
"""Class to recalculate windSpeed daily maximum value. To recalculate the
windSpeed daily maximum values:
1. Create a dictionary of parameters required by the fix. The
WindSpeedRecalculation class uses the following parameters as indicated:
name: Name of the fix, for the windSpeed recalculation fix
this is 'windSpeed Recalculation'. String. Mandatory.
binding: The binding of the database to be fixed. Default is
the binding specified in weewx.conf [StdArchive].
String, eg 'binding_name'. Optional.
trans_days: Number of days of data used in each database
transaction. Integer, default is 50. Optional.
dry_run: Process the fix as if it was being applied but do not
write to the database. Boolean, default is True.
Optional.
2. Create an WindSpeedRecalculation object passing it a weewx config dict
and a fix config dict.
3. Call the resulting object's run() method to apply the fix.
"""
def __init__(self, config_dict, fix_config_dict):
"""Initialise our WindSpeedRecalculation object."""
# call our parents __init__
super(WindSpeedRecalculation, self).__init__(config_dict, fix_config_dict)
# log if a dry run
if self.dry_run:
syslog.syslog(syslog.LOG_INFO,
"maxwindspeed: This is a dry run. Maximum windSpeed will be recalculated but not saved.")
# Get the binding for the archive we are to use. If we received an
# explicit binding then use that otherwise use the binding that
# StdArchive uses.
try:
db_binding = fix_config_dict['binding']
except KeyError:
if 'StdArchive' in config_dict:
db_binding = config_dict['StdArchive'].get('data_binding',
'wx_binding')
else:
db_binding = 'wx_binding'
self.binding = db_binding
# get a database manager object
self.dbm = weewx.manager.open_manager_with_config(config_dict,
self.binding)
syslog.syslog(syslog.LOG_DEBUG,
"maxwindspeed: Using database binding '%s', "
"which is bound to database '%s'." %
(self.binding, self.dbm.database_name))
# number of days per db transaction, default to 50.
self.trans_days = int(fix_config_dict.get('trans_days', 50))
syslog.syslog(syslog.LOG_DEBUG,
"maxwindspeed: Database transactions will use %s days of data." % self.trans_days)
def run(self):
"""Main entry point for applying the windSpeed Calculation fix.
Recalculating the windSpeed daily sumamry max field from archive data
is idempotent so there is no need to check wheteher the fix has already
been applied. Just go ahead and do it catching any exceptions we know
may be raised.
"""
# apply the fix but be prepared to catch any exceptions
try:
self.do_fix()
except weedb.NoTableError:
raise
except weewx.ViolatedPrecondition, e:
syslog.syslog(syslog.LOG_ERR,
"maxwindspeed: %s not applied: %s" % (self.name, e))
# raise the error so caller can deal with it if they want
raise
def do_fix(self):
"""Recalculate windSpeed daily sumamry max field from archive data.
Step through each row in the windSpeed daily summary table and replace
the max field with the max value for that day based on archive data.
Database transactions are done in self.trans_days days at a time.
"""
t1 = time.time()
syslog.syslog(syslog.LOG_INFO,
"maxwindspeed: Applying %s..." % self.name)
# get the start and stop Gregorian day number
start_ts = self.first_summary_ts('windSpeed')
start_greg = weeutil.weeutil.toGregorianDay(start_ts)
stop_greg = weeutil.weeutil.toGregorianDay(self.dbm.last_timestamp)
# initialise a few things
day = start_greg
n_days = 0
last_start = None
while day <= stop_greg:
# get the start and stop timestamps for this tranche
tr_start_ts = weeutil.weeutil.startOfGregorianDay(day)
tr_stop_ts = weeutil.weeutil.startOfGregorianDay(day + self.trans_days - 1)
# start the transaction
with weedb.Transaction(self.dbm.connection) as _cursor:
# iterate over the rows in the windSpeed daily summary table
for day_span in self.genSummaryDaySpans(tr_start_ts, tr_stop_ts, 'windSpeed'):
# get the days max windSpeed and the time it occurred from
# the archive
(day_max_ts, day_max) = self.get_archive_span_max(day_span, 'windSpeed')
# now save the value and time in the applicable row in the
# windSpeed daily summary, but only if its not a dry run
if not self.dry_run:
self.write_max('windSpeed', day_span.start,
day_max, day_max_ts)
# increment our days done counter
n_days += 1
# give the user some information on progress
if n_days % 50 == 0:
self._progress(n_days, day_span.start)
last_start = day_span.start
# advance to the next tranche
day += self.trans_days
# we have finished, give the user some final information on progress,
# mainly so the total tallies with the log
self._progress(n_days, last_start)
print >>sys.stdout
tdiff = time.time() - t1
# We are done so log and inform the user
syslog.syslog(syslog.LOG_INFO,
"maxwindspeed: Maximum windSpeed calculated for %s days in %0.2f seconds." % (n_days, tdiff))
if self.dry_run:
syslog.syslog(syslog.LOG_INFO,
"maxwindspeed: This was a dry run. %s was not applied." % self.name)
def get_archive_span_max(self, span, obs):
"""Find the max value of an obs and its timestamp in a span based on
archive data.
Gets the max value of an observation and the timestamp at which it
occurred from a TimeSpan of archive records. Raises a
weewx.ViolatedPrecondition error if the max value of the observation
field could not be determined.
Input parameters:
span: TimesSpan object of the period from which to determine
the interval value.
obs: The observation to be used.
Returns:
A tuple of the format:
(timestamp, value)
where:
timestamp is the epoch timestamp when the max value occurred
value is the max value of the observation over the time span
If no observation field values are found then a
weewx.ViolatedPrecondition error is raised.
"""
select_str = "SELECT dateTime, %(obs_type)s FROM %(table_name)s "\
"WHERE dateTime > %(start)s AND dateTime <= %(stop)s AND "\
"%(obs_type)s = (SELECT MAX(%(obs_type)s) FROM %(table_name)s "\
"WHERE dateTime > %(start)s and dateTime <= %(stop)s) AND "\
"%(obs_type)s IS NOT NULL"
interpolate_dict = {'obs_type': obs,
'table_name': self.dbm.table_name,
'start': span.start,
'stop': span.stop}
_row = self.dbm.getSql(select_str % interpolate_dict)
if _row:
try:
return _row[0], _row[1]
except IndexError:
_msg = "'%s' field not found in archive day %s." % (obs, span)
raise weewx.ViolatedPrecondition(_msg)
else:
return None, None
def write_max(self, obs, row_ts, value, when_ts, cursor=None):
"""Update the max and maxtime fields in an existing daily summary row.
Updates the max and maxtime fields in a row in a daily summary table.
Input parameters:
obs: The observation to be used. the daily sumamry updated will
be xxx_day_obs where xxx is the database archive table name.
row_ts: Timestamp of the row to be uodated.
value: The value to be saved in field max
when_ts: The timestamp to be saved in field maxtime
cursor: Cursor object for the database connection being used.
Returns:
Nothing.
"""
_cursor = cursor or self.dbm.connection.cursor()
max_update_str = "UPDATE %s_day_%s SET %s=?,%s=? WHERE datetime=?" % (self.dbm.table_name,
obs,
'max',
'maxtime')
_cursor.execute(max_update_str, (value, when_ts, row_ts))
if cursor is None:
_cursor.close()
@staticmethod
def _progress(ndays, last_time):
"""Utility function to show our progress while processing the fix."""
print >>sys.stdout, "Updating 'windSpeed' daily summary: %d; Timestamp: %s\r" % \
(ndays, timestamp_to_string(last_time, format_str="%Y-%m-%d")),
sys.stdout.flush()
# ============================================================================
# class IntervalWeighting
# ============================================================================
class IntervalWeighting(DatabaseFix):
"""Class to apply an interval based weight factor to the daily summaries.
To apply the interval weight factor:
1. Create a dictionary of parameters required by the fix. The
IntervalWeighting class uses the following parameters as indicated:
name: Name of the class defining the fix, for the interval
weighting fix this is 'Interval Weighting'. String.
Mandatory.
binding: The binding of the database to be fixed. Default is
the binding specified in weewx.conf [StdArchive].
String, eg 'binding_name'. Optional.
trans_days: Number of days to be fixed in each database
transaction. Integer, default is 50. Optional.
dry_run: Process the fix as if it was being applied but do not
write to the database. Boolean, default is True.
Optional.
2. Create an IntervalWeighting object passing it a weewx config dict and a
fix config dict.
3. Call the resulting object's run() method to apply the fix.
"""
def __init__(self, config_dict, fix_config_dict):
"""Initialise our IntervalWeighting object."""
# call our parents __init__
super(IntervalWeighting, self).__init__(config_dict, fix_config_dict)
# Get the binding for the archive we are to use. If we received an
# explicit binding then use that otherwise use the binding that
# StdArchive uses.
try:
db_binding = fix_config_dict['binding']
except KeyError:
if 'StdArchive' in config_dict:
db_binding = config_dict['StdArchive'].get('data_binding',
'wx_binding')
else:
db_binding = 'wx_binding'
self.binding = db_binding
# Get a database manager object
self.dbm = weewx.manager.open_manager_with_config(config_dict,
self.binding)
# Number of days per db transaction, default to 50.
self.trans_days = int(fix_config_dict.get('trans_days', 50))
def run(self):
"""Main entry point for applying the interval weighting fix.
Check archive records of unweighted days to see if each day of records
has a unique interval value. If interval value is unique then apply the
weighting. Catch any exceptions and raise as necessary. If any one day
has multiple interval value then we cannot weight the daily summaries,
instead rebuild the daily summaries.
"""
# first do some logging about what we will do
if self.dry_run:
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: This is a dry run. Interval weighting will be applied but not saved.")
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: Using database binding '%s', "
"which is bound to database '%s'." %
(self.binding, self.dbm.database_name))
syslog.syslog(syslog.LOG_DEBUG,
"intervalweighting: Database transactions will use %s days of data." % self.trans_days)
# Check metadata 'Version' value, if its greater than 1.0 we are
# already weighted
_daily_summary_version = self.dbm._read_metadata('Version')
if _daily_summary_version is None or _daily_summary_version < '2.0':
# Get the ts of the (start of the) next day to weight; it's the day
# after the ts of the last successfully weighted daily summary
_last_patched_ts = self.dbm._read_metadata('lastWeightPatch')
if _last_patched_ts:
_next_day_to_patch_dt = datetime.datetime.fromtimestamp(int(_last_patched_ts)) + datetime.timedelta(days=1)
_next_day_to_patch_ts = time.mktime(_next_day_to_patch_dt.timetuple())
else:
_next_day_to_patch_ts = None
# Check to see if any days that need to be weighted have multiple
# distinct interval values
if self.unique_day_interval(_next_day_to_patch_ts):
# We have a homogeneous intervals for each day so we can weight
# the daily summaries.
# Now apply the weighting but be prepared to catch any
# exceptions
try:
self.do_fix(_next_day_to_patch_ts)
# If we arrive here the fix was applied, if this is not
# a dry run then set the 'Version' metadata field to
# indicate we have updated to version 2.0.
if not self.dry_run:
with weedb.Transaction(self.dbm.connection) as _cursor:
self.dbm._write_metadata('Version', '2.0', _cursor)
except weewx.ViolatedPrecondition, e:
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: %s not applied: %s"
% (self.name, e))
# raise the error so caller can deal with it if they want
raise
else:
# At least one day that needs to be weighted has multiple
# distinct interval values. We cannot apply the weighting by
# manipulating the existing daily summaries so we will weight
# by rebuilding the daily summaries. Rebuild is destructive so
# only do it if this is not a dry run
if not self.dry_run:
syslog.syslog(syslog.LOG_DEBUG,
"intervalweighting: Multiple distinct 'interval' values found for at least one archive day.")
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: %s will be applied by dropping and rebuilding daily summaries." % self.name)
self.dbm.drop_daily()
self.dbm.close()
# Reopen to force rebuilding of the schema
self.dbm = weewx.manager.open_manager_with_config(self.config_dict,
self.binding,
initialize=True)
# This will rebuild to a V2 daily summary
self.dbm.backfill_day_summary()
else:
# daily summaries are already weighted
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: %s has already been applied." % self.name)
def do_fix(self, np_ts):
"""Apply the interval weighting fix to the daily summaries."""
# do we need to weight? Only weight if next day to weight ts is None or
# there are records in the archive from that day
if np_ts is None or self.dbm.last_timestamp > np_ts:
t1 = time.time()
syslog.syslog(syslog.LOG_INFO, "intervalweighting: Applying %s..." % self.name)
_days = 0
# Get the earliest daily summary ts and the obs that it came from
first_ts, obs = self.first_summary()
# Get the start and stop ts for our first transaction days
_tr_start_ts = np_ts if np_ts is not None else first_ts
_tr_stop_dt = datetime.datetime.fromtimestamp(_tr_start_ts) + datetime.timedelta(days=self.trans_days)
_tr_stop_ts = time.mktime(_tr_stop_dt.timetuple())
_tr_stop_ts = min(startOfDay(self.dbm.last_timestamp), _tr_stop_ts)
last_start = None
while True:
with weedb.Transaction(self.dbm.connection) as _cursor:
for _day_span in self.genSummaryDaySpans(_tr_start_ts, _tr_stop_ts, obs):
# Get the weight to be applied for the day
_weight = self.get_interval(_day_span) * 60
# Get the current day stats in an accumulator
_day_accum = self.dbm._get_day_summary(_day_span.start)
# Set the unit system for the accumulator
_day_accum.unit_system = self.dbm.std_unit_system
# Weight the necessary accumulator stats, use a
# try..except in case something goes wrong
last_key = None
try:
for _day_key in self.dbm.daykeys:
last_key = _day_key
_day_accum[_day_key].wsum *= _weight
_day_accum[_day_key].sumtime *= _weight
# Do we have a vecstats accumulator?
if hasattr(_day_accum[_day_key], 'wsquaresum'):
# Yes, so update the weighted vector stats
_day_accum[_day_key].wsquaresum *= _weight
_day_accum[_day_key].xsum *= _weight
_day_accum[_day_key].ysum *= _weight
_day_accum[_day_key].dirsumtime *= _weight
except Exception, e:
# log the exception and re-raise it
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: Interval weighting of '%s' daily summary "
"for %s failed: %s" %
(last_key, timestamp_to_string(_day_span.start, format="%Y-%m-%d"), e))
raise
# Update the daily summary with the weighted accumulator
if not self.dry_run:
self.dbm._set_day_summary(_day_accum, None, _cursor)
_days += 1
# Save the ts of the weighted daily summary as the
# 'lastWeightPatch' value in the archive_day__metadata
# table
if not self.dry_run:
self.dbm._write_metadata('lastWeightPatch',
_day_span.start,
_cursor)
# Give the user some information on progress
if _days % 50 == 0:
self._progress(_days, _day_span.start)
last_start = _day_span.start
# Setup our next tranche
# Have we reached the end, if so break to finish
if _tr_stop_ts >= startOfDay(self.dbm.last_timestamp):
break
# More to process so set our start and stop for the next
# transaction
_tr_start_dt = datetime.datetime.fromtimestamp(_tr_stop_ts) + datetime.timedelta(days=1)
_tr_start_ts = time.mktime(_tr_start_dt.timetuple())
_tr_stop_dt = datetime.datetime.fromtimestamp(_tr_start_ts) + datetime.timedelta(days=self.trans_days)
_tr_stop_ts = time.mktime(_tr_stop_dt.timetuple())
_tr_stop_ts = min(self.dbm.last_timestamp, _tr_stop_ts)
# We have finished. Get rid of the no longer needed lastWeightPatch
with weedb.Transaction(self.dbm.connection) as _cursor:
_cursor.execute("DELETE FROM %s_day__metadata WHERE name=?" % self.dbm.table_name, ('lastWeightPatch',))
# Give the user some final information on progress,
# mainly so the total tallies with the log
self._progress(_days, last_start)
print >>sys.stdout
tdiff = time.time() - t1
# We are done so log and inform the user
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: calculated weighting for %s days in %0.2f seconds." % (_days, tdiff))
if self.dry_run:
syslog.syslog(syslog.LOG_INFO, "intervalweighting: This was a dry run. %s was not applied." % self.name)
else:
# we didn't need to weight so inform the user
syslog.syslog(syslog.LOG_INFO,
"intervalweighting: %s has already been applied."
% self.name)
def get_interval(self, span):
"""Return the interval field value used in a span.
Gets the interval field value from a TimeSpan of records. Raises a
weewx.ViolatedPrecondition error if the interval field value could not
be determined.
Input parameters:
span: TimesSpan object of the period from which to determine
the interval value.
Returns:
The interval field value in minutes, if no interval field values
are found then a weewx.ViolatedPrecondition error is raised.
"""
_row = self.dbm.getSql(
"SELECT `interval` FROM %s WHERE dateTime > ? AND dateTime <= ?;"
% self.dbm.table_name, span)
try:
return _row[0]
except IndexError:
_msg = "'interval' field not found in archive day %s." % (span, )
raise weewx.ViolatedPrecondition(_msg)
def unique_day_interval(self, timestamp):
"""Check a weewx archive for homogenious interval values for each day.
An 'archive day' of records includes all records whose dateTime value
is greater than midnight at the start of the day and less than or equal
to midnight at the end of the day. Each 'archive day' of records is
checked for multiple distinct interval values.
Input parameters:
timestamp: check archive days containing timestamp and later. If
None then all archive days are checked.
Returns:
True if all checked archive days have a single distinct interval
value. Otherwise returns False (ie if more than one distinct
interval value is found in any one archive day).
"""
t1 = time.time()
syslog.syslog(syslog.LOG_DEBUG,
"intervalweighting: Checking table '%s' for multiple "
"'interval' values per day..." % self.dbm.table_name)
start_ts = timestamp if timestamp else self.dbm.first_timestamp
_days = 0
_result = True
for _day_span in weeutil.weeutil.genDaySpans(start_ts,
self.dbm.last_timestamp):
_row = self.dbm.getSql("SELECT MIN(`interval`),MAX(`interval`) FROM %s "
"WHERE dateTime > ? AND dateTime <= ?;" % self.dbm.table_name, _day_span)
try:
# If MIN and MAX are the same then we only have 1 distinct
# value. If the query returns nothing then that is fine too,
# probably no archive data for that day.
_result = _row[0] == _row[1] if _row else True
except IndexError:
# Something is seriously amiss, raise an error
raise weewx.ViolatedPrecondition("Invalid 'interval' data detected in archive day %s." % (_day_span, ))
_days += 1
if not _result:
break
if _result:
syslog.syslog(syslog.LOG_DEBUG,
"intervalweighting: Successfully checked %s days "
"for multiple 'interval' values in %0.2f seconds." % (_days, (time.time() - t1)))
return _result
def first_summary(self):
"""Obtain the timestamp and observation name of the earliest daily
summary entry.
It is possible the earliest dateTime value in the daily summary tables
will be different from table to table. To find the earliest dateTime
value we loop through each daily summary table finding the earliest
dateTime value for each table and then take the earliest value of these.
Returns:
A tuple of the form (timestamp, observation)
where:
timestamp: The earliest timestamp across all daily summary
tables
observation: The observation whose daily summary table has the
earliest timestamp
(None, None) is returned if no dateTime values were found.
"""
_res = (None, None)
for _key in self.dbm.daykeys:
_ts = self.first_summary_ts(_key)
if _ts:
_res = (weeutil.weeutil.min_with_none((_res[0], _ts)), _key)
return _res
@staticmethod
def _progress(ndays, last_time):
"""Utility function to show our progress while processing the fix."""
print >>sys.stdout, "Weighting daily summary: %d; Timestamp: %s\r" % \
(ndays, timestamp_to_string(last_time, format_str="%Y-%m-%d")),
sys.stdout.flush()