Files
weewx/bin/wee_database
gjr80 d39230cc94 Finished implementing times for most actions, various rewording/reformatting
- removed a lot of old v2 wording that referrred to the 'archive
database'
2017-01-09 00:24:18 +10:00

662 lines
32 KiB
Python
Executable File

#!/usr/bin/python
#
# Copyright (c) 2009-2015 Tom Keffer <tkeffer@gmail.com>
#
# See the file LICENSE.txt for your full rights.
#
"""Configure the databases used by weewx"""
from __future__ import with_statement
# python imports
import datetime
import optparse
import syslog
import sys
import time
from datetime import datetime as dt
from distutils.version import StrictVersion
# weewx imports
import user.extensions #@UnusedImport
import weecfg
import weecfg.patch
import weedb
import weewx
import weewx.manager
import weewx.units
from weeutil.weeutil import TimeSpan, timestamp_to_string
usage = """wee_database --help
wee_database --create-archive
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME]
wee_database --drop-daily
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME]
wee_database --rebuild-daily
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME]
[--date=YYYY-mm-dd | --from=YYYY-mm-dd --to=YYYY-mm-dd]
[--trans-days=DAYS]
wee_database --reconfigure
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME]
wee_database --transfer
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME]
--dest-binding=BINDING_NAME
[--dry-run]
wee_database --check-strings
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME] [--fix-strings]
wee_database --check-weighting
[CONFIG_FILE|--config=CONFIG_FILE]
[--binding=BINDING_NAME] [--fix-weighting]
[--trans-days=DAYS]
[--dry-run]
Description:
Configure the weewx databases. Most of these functions are handled automatically by
weewx, but they may be useful in special cases.
Actions:
--create-archive Create the weewx database and initialize it with the schema.
--drop-daily Drop the daily summaries from the database.
--rebuild-daily Rebuild the daily summaries for a database from the archive table.
--reconfigure Create a new database using the schema in the configuration
file. Copy over data from the old database.
--transfer Move data from one database to another.
--check-strings Scan the archive table, looking for null strings that may
have been introduced by a SQL editing program.
--check-weighting Check whether the daily summaries have been interval weighted."""
epilog = """If you are using a MySQL database it is assumed that you have the
appropriate permissions for the requested operation."""
# List of 'dest' settings used by our 'verbs', note 'dest' may be explicit or
# implicit. If adding more 'verbs' need to add corresponding 'dest' here.
dest_list = ['create_archive', 'drop_daily', 'rebuild_daily',
'reconfigure', 'check_strings', 'transfer', 'check_weighting']
def main():
# Set defaults for the system logger:
syslog.openlog('wee_database', syslog.LOG_PID|syslog.LOG_CONS)
# Create a command line parser:
parser = optparse.OptionParser(usage=usage, epilog=epilog)
# Add the various options:
parser.add_option("--config", dest="config_path", type=str,
metavar="CONFIG_FILE",
help="Use configuration file CONFIG_FILE.")
parser.add_option("--create-archive", dest="create_archive",
action='store_true',
help="Create the weewx database.")
parser.add_option("--drop-daily", dest="drop_daily", action='store_true',
help="Drop the daily summary tables from a database.")
parser.add_option("--rebuild-daily", dest="rebuild_daily",
action='store_true',
help="Rebuild the daily summaries for a database.")
parser.add_option("--date", dest="date", type=str, metavar="YYYY-mm-dd",
help="Rebuild the daily summaries for this date. Format"
" is YYYY-mm-dd.")
parser.add_option("--from", dest="date_from", type=str, metavar="YYYY-mm-dd",
help="Rebuild the daily summaries starting at this date."
" Format is YYYY-mm-dd.")
parser.add_option("--to", dest="date_to", type=str, metavar="YYYY-mm-dd",
help="Rebuild the daily summaries up until this date."
" Format is YYYY-mm-dd.")
parser.add_option("--reconfigure", action='store_true',
help="Create a new database using configuration"
" information found in the configuration file. In"
" particular, the new database will use the unit system"
" found in option [StdConvert][target_unit]."
" The new database will have the same name as the old"
" database, with a '_new' on the end.")
parser.add_option("--transfer", dest="transfer", action='store_true',
help="Transfer the weewx archive from source database to"
" destination database.")
parser.add_option("--check-strings", dest="check_strings",
action="store_true",
help="Check a sqlite version of the archive table to"
" see whether it contains embedded strings.")
parser.add_option("--fix-strings", dest="fix_strings", action="store_true",
help="Fix any embedded strings in a sqlite database.")
parser.add_option("--check-weighting", dest="check_weighting",
action='store_true',
help="Check whether the daily summaries have been interval"
" weighted.")
parser.add_option("--fix-weighting", dest="fix_weighting", action="store_true",
help="Apply interval weighting to daily summaries.")
parser.add_option("--binding", dest="binding", metavar="BINDING_NAME",
default='wx_binding',
help="The data binding. Default is 'wx_binding'.")
parser.add_option("--dest-binding", dest="dest_binding",
metavar="BINDING_NAME",
help="The destination data binding.")
parser.add_option('--dry-run', dest="dry_run", action='store_true',
default=False,
help='Print what would happen but do not do it. Default'
' is False.')
parser.add_option('--trans-days', dest='trans_days', metavar="DAYS",
help="Limit database transactions to no more than DAYS"
" days of data at a time. Default value is 5 when"
" rebuilding daily summaries and 50 when interval weighting"
" daily summaries weight. May be increased for a slight"
" speed increase or reduced to reduce memory usage.")
# Now we are ready to parse the command line:
(options, args) = parser.parse_args()
# Do a check to see if the user used more than 1 'verb' . More than 1 verb
# should be avoided - what did the user really want to do?
# first get a list of options actually used
options_list = [opt for opt, val in options.__dict__.items() if val is not None]
# now a list of the 'dest' settings used
dest_used = [opt for opt in options_list if opt in dest_list]
# If we have more than 1 entry in dest_used then we have more than 1 'verb'
# used. Then inform the user and exit. If only 1 'verb' then carry on.
if len(dest_used) > 1:
# generate the message but we need to speak 'options' (aka verbs)
# not 'dest'
# get a dict of {option.dest: option.str} from our universe of options
opt_dict = dict((x.dest, x.get_opt_string()) for x in parser.option_list[1:])
# now get a list of 'verbs' used
verbs_used = [opt_dict[x] for x in dest_used]
# get rid of the [ and ]
verbs_str = ', '.join(verbs_used)
# our exit message
exit_str = ("Multiple verbs found in command line %s. Only one verb permitted.\nNothing done. Aborting." %
(verbs_str, ))
# now exit with our message
sys.exit(exit_str)
# get config_dict to use
config_path, config_dict = weecfg.read_config(options.config_path, args)
print "Using configuration file %s" % config_path
db_binding = options.binding
database = config_dict['DataBindings'][db_binding]['database']
print "Using database binding '%s', which is bound to database '%s'" % (db_binding, database)
if options.create_archive:
createMainDatabase(config_dict, db_binding)
if options.drop_daily:
dropDaily(config_dict, db_binding)
if options.rebuild_daily:
rebuildDaily(config_dict, db_binding, options)
if options.reconfigure:
reconfigMainDatabase(config_dict, db_binding)
if options.transfer:
transferDatabase(config_dict, db_binding, options)
if options.check_strings:
check_strings(config_dict, db_binding, options.fix_strings)
if options.check_weighting:
check_weighting(config_dict, db_binding, options)
def createMainDatabase(config_dict, db_binding):
"""Create the weewx database"""
# Try a simple open. If it succeeds, that means the database
# exists and is initialized. Otherwise, an exception will be thrown.
try:
with weewx.manager.open_manager_with_config(config_dict, db_binding) as dbmanager:
print "Database '%s' already exists. Nothing done." % (dbmanager.database_name,)
except weedb.OperationalError:
# Database does not exist. Try again, but allow initialization:
with weewx.manager.open_manager_with_config(config_dict, db_binding, initialize=True) as dbmanager:
print "Created database '%s'" % (dbmanager.database_name,)
def dropDaily(config_dict, db_binding):
"""Drop the daily summaries from a weewx database"""
manager_dict = weewx.manager.get_manager_dict_from_config(config_dict,
db_binding)
database_name = manager_dict['database_dict']['database_name']
ans = None
while ans not in ['y', 'n']:
print "Proceeding will delete all your daily summaries from database '%s'" % database_name
ans = raw_input("Are you sure you want to proceed (y/n)? ")
if ans == 'y' :
t1 = time.time()
print "Dropping daily summary tables from '%s' ... " % database_name
try:
with weewx.manager.open_manager_with_config(config_dict, db_binding) as dbmanager:
try:
dbmanager.drop_daily()
except weedb.OperationalError, e:
print "Got error '%s'\nPerhaps there was no daily summary?" % e
else:
tdiff = time.time() - t1
print "Daily summary tables dropped from database '%s' in %.2f seconds" % (database_name, tdiff)
except weedb.OperationalError:
# No daily summaries. Nothing to be done.
print "No daily summaries found in database '%s'. Nothing done." % (database_name,)
def rebuildDaily(config_dict, db_binding, options):
"""Rebuild the daily summaries."""
t1 = time.time()
manager_dict = weewx.manager.get_manager_dict_from_config(config_dict,
db_binding)
database_name = manager_dict['database_dict']['database_name']
# how many days per transaction
trans_days = int(options.trans_days) if options.trans_days else 5
# Open up the database. This will create the tables necessary for the daily
# summaries if they don't already exist:
with weewx.manager.open_manager_with_config(config_dict, db_binding, initialize=True) as dbmanager:
# determine the period over which we are rebuilding from any command
# line date parameters
start_ts, stop_ts = _parse_dates(options)
# advise the user/log what we will do
if start_ts is None and stop_ts is None:
start_ts = dbmanager.firstGoodStamp()
stop_ts = dbmanager.lastGoodStamp()
syslog.syslog(syslog.LOG_INFO, "All daily summaries will be rebuilt")
print "All daily summaries will be rebuilt"
else:
if options.date:
_msg = "Daily summaries for %s only will be rebuilt" % (timestamp_to_string(start_ts,
'%Y-%m-%d'))
syslog.syslog(syslog.LOG_INFO, _msg)
print _msg
if options.date_from:
_msg = "Daily summaries from %s to %s inclusive" % (timestamp_to_string(start_ts, '%Y-%m-%d'),
timestamp_to_string(stop_ts-1, '%Y-%m-%d'))
_msg2 = "will be rebuilt"
syslog.syslog(syslog.LOG_INFO, _msg + " " + _msg2)
print _msg
print _msg2
syslog.syslog(syslog.LOG_INFO, "Rebuilding daily summaries in database '%s' ..." % database_name)
print "Rebuilding daily summaries in database '%s' ..." % database_name
# now do the actual rebuild
nrecs, ndays = dbmanager.backfill_day_summary(trans_days=trans_days,
start_ts=start_ts,
stop_ts=stop_ts,
rebuild=True)
tdiff = time.time() - t1
# advise the user/log what we did
syslog.syslog(syslog.LOG_INFO, "Rebuild of daily summaries in database '%s' complete" % database_name)
if nrecs:
sys.stdout.flush()
print "Processed %d records to rebuild %d day summaries in %.2f seconds " % (nrecs,
ndays,
tdiff)
print "Rebuild of daily summaries in database '%s' complete" % database_name
else:
print "Daily summaries up to date in '%s'" % database_name
def reconfigMainDatabase(config_dict, db_binding):
"""Create a new database, then populate it with the contents of an old database"""
manager_dict = weewx.manager.get_manager_dict_from_config(config_dict,
db_binding)
# Make a copy for the new database (we will be modifying it)
new_database_dict = dict(manager_dict['database_dict'])
# Now modify the database name
new_database_dict['database_name'] = manager_dict['database_dict']['database_name'] + '_new'
# First check and see if the new database already exists. If it does, check
# with the user whether it's ok to delete it.
try:
weedb.create(new_database_dict)
except weedb.DatabaseExists:
ans = None
while ans not in ['y', 'n']:
ans = raw_input("New database '%s' already exists. Delete it first (y/n)? " % (new_database_dict['database_name'],))
if ans == 'y':
weedb.drop(new_database_dict)
elif ans == 'n':
print "Nothing done."
return
# Get the unit system of the old archive:
with weewx.manager.Manager.open(manager_dict['database_dict']) as old_dbmanager:
old_unit_system = old_dbmanager.std_unit_system
if old_unit_system is None:
print "Old database has not been initialized. Nothing to be done."
return
# Get the unit system of the new archive:
try:
target_unit_nickname = config_dict['StdConvert']['target_unit']
except KeyError:
target_unit_system = None
else:
target_unit_system = weewx.units.unit_constants[target_unit_nickname.upper()]
ans = None
while ans not in ['y', 'n']:
print "Copying database '%s' to '%s'" % (manager_dict['database_dict']['database_name'],
new_database_dict['database_name'])
if target_unit_system is None or old_unit_system==target_unit_system:
print ("The new database will use the same unit system as the old ('%s')." %
(weewx.units.unit_nicknames[old_unit_system],))
else:
print ("Units will be converted from the '%s' system to the '%s' system." %
(weewx.units.unit_nicknames[old_unit_system],
weewx.units.unit_nicknames[target_unit_system]))
ans = raw_input("Are you sure you wish to proceed (y/n)? ")
if ans == 'y':
t1 = time.time()
weewx.manager.reconfig(manager_dict['database_dict'],
new_database_dict,
new_unit_system=target_unit_system,
new_schema=manager_dict['schema'])
tdiff = time.time() - t1
print "Database '%s' copied to '%s' in %.2f seconds." % (manager_dict['database_dict']['database_name'],
new_database_dict['database_name'],
tdiff)
elif ans == 'n':
print "Nothing done."
def check_strings(config_dict, db_binding, fix=False):
"""Scan the archive table for null strings."""
t1 = time.time()
print "Checking archive table for strings..."
# Open up the main database archive table
with weewx.manager.open_manager_with_config(config_dict, db_binding) as dbmanager:
obs_pytype_list = []
obs_list = []
# Get the schema and extract the Python type each observation type should be
for column in dbmanager.connection.genSchemaOf('archive'):
schema_type = column[2]
if schema_type == 'INTEGER':
schema_type = int
elif schema_type == 'REAL':
schema_type = float
elif schema_type == 'STR':
schema_type = str
# Save the observation type for this column (eg, 'outTemp'):
obs_list.append(column[1])
# Save the Python type for this column (eg, 'int'):
obs_pytype_list.append(schema_type)
found = 0
fixed = 0
# Cycle through each row in the database
for record in dbmanager.genBatchRows():
# Now examine each column
for icol in range(len(record)):
# Check to see if this column is an instance of the correct Python type
if record[icol] is not None and not isinstance(record[icol], obs_pytype_list[icol]):
# Oops. Found a bad one. Print it out
found += 1
sys.stdout.write("Timestamp = %s; record['%s']= %r; ... " % (record[0],
obs_list[icol],
record[icol]))
if fix:
# Cooerce to the correct type. If it can't be done, then set it to None
try:
corrected_value = obs_pytype_list[icol](record[icol])
except ValueError:
corrected_value = None
# Update the database with the new value
dbmanager.updateValue(record[0], obs_list[icol], corrected_value)
fixed += 1
# Inform the user
sys.stdout.write("changed to %r\n" % corrected_value)
else:
sys.stdout.write("ignored.\n")
tdiff = time.time() - t1
if fix:
print "String check complete. %d of %d null strings found were fixed in %.2f seconds" % (fixed,
found,
tdiff)
else:
print "String check complete. %d null strings found in %.2f seconds" % (found,
tdiff)
def transferDatabase(config_dict, db_binding, options):
"""Transfer 'archive' data from one database to another"""
# do we have enough to go on, must have a dest binding
if options.dest_binding is None:
print "Destination binding not specified. Nothing Done. Aborting."
return
# get manager dict for our source binding
src_manager_dict = weewx.manager.get_manager_dict_from_config(config_dict,
db_binding)
# get manager dict for our dest binding
try:
dest_manager_dict = weewx.manager.get_manager_dict_from_config(config_dict,
options.dest_binding)
except weewx.UnknownBinding:
# if we can't find the binding display a message then return
print ("Unknown destination binding '%s', confirm destination binding." %
(options.dest_binding, ))
print "Nothing Done. Aborting."
return
except weewx.UnknownDatabase:
# if we can't find the database display a message then return
print "Error accessing destination database, confirm destination binding and/or database."
print "Nothing Done. Aborting."
return
except (ValueError, AttributeError):
# maybe a schema issue
print "Error accessing destination database."
print ("Maybe the destination schema is incorrectly specified in binding '%s' in weewx.conf?" %
(options.dest_binding, ))
print "Nothing Done. Aborting."
return
except (weewx.UnknownDatabaseType):
# maybe a [Databases] issue
print "Error accessing destination database."
print "Maybe the destination database is incorrectly defined in weewx.conf?"
print "Nothing Done. Aborting."
return
# get a manager for our source
with weewx.manager.Manager.open(src_manager_dict['database_dict']) as src_manager:
# get first and last timestamps from the source so we can count the
# records to transfer and display an appropriate message
first_ts = src_manager.firstGoodStamp()
last_ts = src_manager.lastGoodStamp()
if first_ts is not None and last_ts is not None:
# we have source records
num_recs = src_manager.getAggregate(TimeSpan(first_ts, last_ts),
'dateTime', 'count')[0]
else:
# we have no source records to transfer so abort with a message
print ("No records found in source database '%s' for transfer." %
(src_manager.database_name, ))
print "Nothing done. Aborting."
exit()
ans = None
if not options.dry_run: # is it a dry run ?
# not a dry run, actually do the transfer
while ans not in ['y', 'n']:
ans = raw_input("Transfer %s records from source database '%s' to destination database '%s' (y/n)? " %
(num_recs, src_manager.database_name, dest_manager_dict['database_dict']['database_name']))
if ans == 'y':
t1 = time.time()
# wrap in a try..except in case we have an error
try:
with weewx.manager.Manager.open_with_create(dest_manager_dict['database_dict'],
table_name=dest_manager_dict['table_name'],
schema=dest_manager_dict['schema']) as dest_manager:
sys.stdout.write("transferring, this may take a while.... ")
sys.stdout.flush()
# do the transfer, should be quick as it's done as a
# single transaction
dest_manager.addRecord(src_manager.genBatchRecords())
print "complete"
# get first and last timestamps from the dest so we can
# count the records transferred and display a message
first_ts = dest_manager.firstGoodStamp()
last_ts = dest_manager.lastGoodStamp()
tdiff = time.time() - t1
if first_ts is not None and last_ts is not None:
num_recs = dest_manager.getAggregate(TimeSpan(first_ts, last_ts),
'dateTime', 'count')[0]
print ("%s records transferred from source database '%s' to" %
(num_recs, src_manager.database_name))
print ("destination database '%s' in %.2f seconds." %
(dest_manager.database_name, tdiff))
else:
print ("Error. No records were transferred from source database '%s' to destination database '%s'." %
(src_manager.database_name, dest_manager.database_name))
except ImportError:
# probaly when trying to load db driver
print ("Error accessing destination database '%s'." %
(dest_manager_dict['database_dict']['database_name'], ))
print "Maybe the database driver is incorrectly specified in weewx.conf?"
print "Nothing done. Aborting."
return
except (OSError, weedb.OperationalError):
# probably a weewx.conf typo or MySQL db not created
print ("Error accessing destination database '%s'." %
(dest_manager_dict['database_dict']['database_name'], ))
print "Maybe it does not exist (MySQL) or is incorrectly defined in weewx.conf?"
print "Nothing done. Aborting."
return
elif ans == 'n':
# we decided not to do the transfer
print "Nothing done."
return
else:
# it's a dry run so say what we would have done then return
print ("Transfer %s records from source database '%s' to destination database '%s'." %
(num_recs, src_manager.database_name, dest_manager_dict['database_dict']['database_name']))
print "Dry run, nothing done."
def check_weighting(config_dict, db_binding, options):
"""Check whether the daily summaries have been interval weighted."""
trans_days = int(options.trans_days) if options.trans_days else 50
# construct an interval_weighting config dict
interval_weighting_config_dict = {'name': 'Interval Weighting',
'binding': db_binding,
'trans_days': trans_days,
'dry_run': options.dry_run}
# create an interval weighting object
weight_obj = weecfg.patch.IntervalWeighting(config_dict,
interval_weighting_config_dict)
_daily_summary_version = weight_obj.read_metadata('Version')
print "Daily summary version is %s" % _daily_summary_version
if _daily_summary_version is not None and _daily_summary_version >= 2.0:
print "Daily summary tables have been interval weighted."
else:
print "Daily summary tables have not been interval weighted."
if options.fix_weighting:
# this action is irreversible so check the user really wants to do
# this
ans = None
while ans not in ['y', 'n']:
ans = raw_input("Applying interval weighting to the daily summaries is irreversible. Continue (y/n)? ")
if ans == 'y':
# apply the interval weighting
weight_obj.run()
elif ans == 'n':
# we decided not to interval weight the summary tables
print "Nothing done."
def _parse_dates(options):
"""Parse --date, --from and --to command line options.
Any --date, --from and --to command line options need to be processed
to derive a start and stop timestamp for any daily summary rebuild.
DaySummaryManager.backfill_day_summary() accepts two timestamps;
start_ts and stop_ts. Archive records timestamped greater than start_ts
and less than or equal to stop_ts will be used in the rebuild.
Therefore it is safe to return timestamps on the midnight boundary.
If --date is specified then --to and --from are ignored. If --date is
not specified then --from and --to are used. An invalid date will
raise a ValueError.
Inputs:
options: the optparse options
Returns:
start_ts: Timestamp of the first archive record to be used for
rebuild. May be None.
stop_ts: Timestamp of the last archive record to be used for
rebuild. May be None.
"""
# First we see if we have a valid --date, if not then we look for
# --from and --to.
if options.date or options.date == "":
# there is a --date but is it valid
try:
_first_dt = dt.strptime(options.date, "%Y-%m-%d")
except ValueError:
# Could not convert --date. If we have a --date it must be
# valid otherwise we can't continue so raise it.
_msg = "Invalid --date option specified."
raise ValueError(_msg)
else:
# we have a valid date so do some date arithmetic
_last_dt = _first_dt + datetime.timedelta(days=1)
return (int(time.mktime(_first_dt.timetuple())),
int(time.mktime(_last_dt.timetuple())))
elif options.date_from or options.date_to or options.date_from == '' or options.date_to == '':
# There is a --from and/or a --to, but do we have both and are
# they valid.
# try --from first
try:
_from_dt = dt.strptime(options.date_from, "%Y-%m-%d")
_from_ts = int(time.mktime(_from_dt.timetuple()))
except TypeError:
# --from not specified we can't continue so raise it
_msg = "Missing --from option. Both --from and --to must be specified."
raise TypeError(_msg)
except ValueError:
# could not convert --from, we can't continue so raise it
_msg = "Invalid --from option."
raise ValueError(_msg)
# try --to
try:
_to_dt = dt.strptime(options.date_to, "%Y-%m-%d")
# since it is just a date we want the end of the day
_to_dt += datetime.timedelta(days=1)
_to_ts = int(time.mktime(_to_dt.timetuple()))
except TypeError:
# --to not specified , we can't continue so raise it
_msg = "Missing --to option. Both --from and --to must be specified."
raise TypeError(_msg)
except ValueError:
# could not convert --to, we can't continue so raise it
_msg = "Invalid --to option."
raise ValueError(_msg)
# If we made it here we have a _from_ts and _to_ts. Do a simple
# error check first.
if _from_ts > _to_ts:
# from is later than to, raise it
_msg = "--from value is later than --to value."
raise weewx.ViolatedPrecondition(_msg)
return (_from_ts, _to_ts)
else:
# no --date or --from/--to so we take the default, set all to None
return (None, None)
if __name__=="__main__" :
main()