diff --git a/weewx/wxengine.py b/weewx/wxengine.py index 35255011..2695c86a 100644 --- a/weewx/wxengine.py +++ b/weewx/wxengine.py @@ -14,55 +14,345 @@ import time import socket import os.path import Queue +import threading +from optparse import OptionParser +import configobj + +import daemon import weewx import weewx.archive import weewx.stats -import weewx.wunderground import weeutil.weeutil -def main(config_dict): +usagestr = """ + %prog config_path [--daemon] + + Entry point to the weewx weather program. Can be run from the command + line, or with the '--daemon' option, as a daemon. + +Arguments: + config_path: Path to the configuration file to be used. +""" + + +#=============================================================================== +# Class StdEngine +#=============================================================================== + +class StdEngine(object): + + def __init__(self, service_list): + """Initialize an instance of StdEngine. + + For each listed service in service_list, instantiates an instance of the class, + passing self as the only argument.""" + + self.service_obj = [_get_object(svc, self) for svc in service_list] + + def setup(self): + """Run before anything else.""" + + self.parseArgs() + + # Set up the main archive database: + self.setupArchiveDatabase() + + # Set up the statistical database: + self.setupStatsDatabase() + + # Set up the weather station hardware: + self.setupStation() + + # Set a default socket time out, in case FTP or HTTP hang: + timeout = int(self.config_dict.get('socket_timeout', '20')) + socket.setdefaulttimeout(timeout) + + + # Allow each service to run its setup: + for obj in self.service_obj: + obj.setup() + + def parseArgs(self): + """Parse any command line options.""" + + parser = OptionParser(usage=usagestr) + parser.add_option("-d", "--daemon", action="store_true", dest="daemon", help="Run as a daemon") + (options, args) = parser.parse_args() + + if len(args) < 1: + sys.stderr.write("Missing argument(s).\n") + sys.stderr.write(parser.parse_args(["--help"])) + exit() + + if options.daemon: + daemon.daemonize(pidfile='/var/run/weewx.pid') + + # Try to open up the given configuration file. Declare an error if unable to. + try : + self.config_dict = configobj.ConfigObj(args[0], file_error=True) + except IOError: + sys.stderr.write("Unable to open configuration file %s" % args[0]) + syslog.syslog(syslog.LOG_CRIT, "main: Unable to open configuration file %s" % args[0]) + exit() + + syslog.syslog(syslog.LOG_INFO, "main: Using configuration file %s." % os.path.abspath(args[0])) + + if int(self.config_dict.get('debug', '0')): + # Get extra debug info. Set the logging mask for full debug info + syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG)) + # Set the global debug flag: + weewx.debug = True + + def setupArchiveDatabase(self): + """Setup the main database archive""" + archiveFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'], + self.config_dict['Archive']['archive_file']) + self.archive = weewx.archive.Archive(archiveFilename) + + + # Configure it if necessary (this will do nothing if the database has + # already been configured): + self.archive.config() + + + def setupStatsDatabase(self): + """Prepare the stats database""" + statsFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'], + self.config_dict['Stats']['stats_file']) + # statsDb is an instance of weewx.stats.StatsDb, which wraps the stats sqlite file + self.statsDb = weewx.stats.StatsDb(statsFilename, + int(self.config_dict['Station'].get('heating_base', '65')), + int(self.config_dict['Station'].get('cooling_base', '65')), + int(self.config_dict['Station'].get('cache_loop_data', '1'))) + # Configure it if necessary (this will do nothing if the database has + # already been configured): + self.statsDb.config(self.config_dict['Stats'].get('stats_types')) + + # Backfill it with data from the archive. This will do nothing if + # the stats database is already up-to-date. + weewx.stats.backfill(self.archive, self.statsDb) + + def setupStation(self): + """Set up the weather station hardware.""" + # Get the hardware type from the configuration dictionary. + # This will be a string such as "VantagePro" + stationType = self.config_dict['Station']['station_type'] + + # Look for and load the module that handles this hardware type: + _moduleName = "weewx." + stationType + __import__(_moduleName) + hardware_module = sys.modules[_moduleName] + + try: + + # Now open up the weather station: + self.station = hardware_module.WxStation(self.config_dict[stationType]) + + except Exception, ex: + # Caught unrecoverable error. Log it, exit + syslog.syslog(syslog.LOG_CRIT, "main: Unable to open WX station hardware: %s" % ex) + syslog.syslog(syslog.LOG_CRIT, "main: Exiting.") + # Reraise the exception (this will eventually cause the program to exit) + raise + + + def preloop(self): + """Run every time before asking for LOOP packets""" + print "In preloop()" + + for obj in self.service_obj: + obj.preloop() + + + def processLoopPacket(self, physicalPacket): + """Run whenever a LOOP packet needs to be processed.""" + print "In processLoopPacket()" + + for obj in self.service_obj: + obj.processLoopPacket(physicalPacket) + + def getArchiveData(self): + """This function gets or calculates new archive data""" + print "In getArchiveData()" + + lastgood_ts = self.archive.lastGoodStamp() + + nrec = 0 + # Add all missed archive records since the last good record in the database + for rec in self.station.genArchivePackets(lastgood_ts) : + print"REC:-> ", weeutil.weeutil.timestamp_to_string(rec['dateTime']), rec['barometer'],\ + rec['outTemp'], rec['windSpeed'],\ + rec['windDir'], " <-" + # Add the new record to the archive database and stats database: + self.archive.addRecord(rec) + self.statsDb.addArchiveRecord(rec) + # Give each service a chance to take a look at it: + self.postArchiveData(rec) + nrec += 1 + + if nrec != 0: + syslog.syslog(syslog.LOG_INFO, "wxengine: %d new archive packets added to database" % nrec) + + def postArchiveData(self, rec): + """Run whenever any new archive data appears.""" + for obj in self.service_obj: + obj.postArchiveData(rec) + + def processArchiveData(self): + """Run after any archive data has been retrieved and put in the database.""" + print "In processArchiveData()" + + for obj in self.service_obj: + obj.processArchiveData() + + def start(self): + + self.run() + + def run(self): + + self.setup() + + syslog.syslog(syslog.LOG_INFO, "wxengine: Starting main packet loop.") + + while True: + + self.preloop() + + # Next time to ask for archive records: + nextArchive_ts = (int(time.time() / self.station.archive_interval) + 1) * self.station.archive_interval + self.station.archive_delay + + for physicalPacket in self.station.genLoopPacketsUntil(nextArchive_ts): + + # Process the physical LOOP packet: + self.processLoopPacket(physicalPacket) + + # Calculate/get new archive data: + self.getArchiveData() + + # Now process it. Typically, this means generating reports, etc. + self.processArchiveData() + +#=============================================================================== +# Class StdService +#=============================================================================== + +class StdService(object): + + def __init__(self, engine): + self.engine = engine + + def setup(self, engine): + pass + + def preloop(self): + pass + + def processLoopPacket(self, physicalPacket): + pass + + def postArchiveData(self, rec): + pass + + def processArchiveData(self): + pass + +#=============================================================================== +# Class StdCatchUp +#=============================================================================== + +class StdCatchUp(StdService): + + def __init__(self, engine): + StdService.__init__(self, engine) + + def setup(self): + self.engine.getArchiveData() + +#=============================================================================== +# Class StdTimeSynch +#=============================================================================== + +class StdTimeSynch(StdService): + + def __init__(self, engine): + StdService.__init__(self, engine) + self.engine.last_synch_ts = 0 + + def preloop(self): + # Synch up the station's clock if it's been more than + # clock_check seconds since the last check: + now_ts = time.time() + if now_ts - self.engine.last_synch_ts >= self.engine.station.clock_check: + self.engine.station.setTime() + self.engine.last_synch_ts = now_ts + +#=============================================================================== +# Class StdWunderground +#=============================================================================== + +class StdWunderground(StdService): + import weewx.wunderground + + def __init__(self, engine): + StdService.__init__(self, engine) + + def setup(self): + wunder_dict = self.engine.config_dict.get('Wunderground') + + # Make sure we have a section [Wunderground] and that the station name + # and password exist before committing: + if wunder_dict and (wunder_dict.has_key('station') and + wunder_dict.has_key('password')): + self.queue = Queue.Queue() + archiveFilename = os.path.join(self.engine.config_dict['Station']['WEEWX_ROOT'], + self.engine.config_dict['Archive']['archive_file']) + t = weewx.wunderground.WunderThread(archiveFilename = archiveFilename, + queue = self.queue, + **wunder_dict) + else: + self.queue = None + + def postArchiveData(self, rec): + if self.queue: + self.queue.put(rec['dateTime']) + +#=============================================================================== +# Class StdProcess +#=============================================================================== + +class StdProcess(StdService): + + def __init__(self, engine): + StdService.__init__(self, engine) + + def processArchiveData(self): + """This function processes any new archive data""" + # Now process the data, using a separate thread + processThread = threading.Thread(target = weewx.processdata.processData, + args =(self.config_dict, )) + processThread.start() + + +#=============================================================================== +# Function main +#=============================================================================== + +def main(EngineClass = StdEngine, + service_list = ['weewx.wxengine.StdWunderground', + 'weewx.wxengine.StdCatchUp', + 'weewx.wxengine.StdTimeSynch', + 'weewx.wxengine.StdProcess']) : """Prepare the main loop and run it. - Mostly consists of a bunch of high-level preparatory calls, protected + Mostly consists of a bunch of high-level prepatory calls, protected by try blocks in the case of an exception.""" - if int(config_dict.get('debug', '0')): - # Get extra debug info. Set the logging mask for full debug info - syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG)) - # Set the global debug flag: - weewx.debug = True - - # Set a default socket time out, in case FTP or HTTP hang: - timeout = int(config_dict.get('socket_timeout', '20')) - socket.setdefaulttimeout(timeout) - - # Get the hardware type from the configuration dictionary. - # This will be a string such as "VantagePro" - stationType = config_dict['Station']['station_type'] - - # Look for and load the module that handles this hardware type: - _moduleName = "weewx." + stationType - __import__(_moduleName) - hardware_module = sys.modules[_moduleName] - try: - # Now open up the weather station: - station = hardware_module.WxStation(config_dict[stationType]) - - except Exception, ex: - # Caught unrecoverable error. Log it, exit - syslog.syslog(syslog.LOG_CRIT, "main: Unable to open WX station hardware: %s" % ex) - syslog.syslog(syslog.LOG_CRIT, "main: Exiting.") - # Reraise the exception (this will eventually cause the program to exit) - raise - - try: - - # Create and initialize the WxEngine object, using the dictionary - # and the hardware station: - engine = WxEngine(config_dict, station) + # Create and initialize the MainLoop object + engine = EngineClass(service_list) except Exception, ex: # Caught unrecoverable error. Log it, exit @@ -117,196 +407,3 @@ def _get_object(module_class, *args, **kwargs): obj = mod(*args, **kwargs) return obj -class WxEngine(object): - - def __init__(self, config_dict, station, - setup_list = [('weewx.wxengine.StdSetup')], - preloop_list = [('weewx.wxengine.StdPreLoop')], - loop_list = [], - archive_list = [], - process_list = []): - - self.config_dict = config_dict - - self.station = station - - self.setup_list = setup_list - self.preloop_list = preloop_list - self.loop_list = loop_list - self.archive_list = archive_list - self.process_list = process_list - - def setup(self): - print "In setup()" - for module_class in self.setup_list: - obj = _get_object(module_class, self) - obj.start() - - def preloop(self): - print "In preloop()" - for module_class in self.preloop_list: - obj = _get_object(module_class, self) - obj.start() - - def processLoopPacket(self, physicalPacket): - print "In processLoopPacket()" - for module_class in self.loop_list: - obj = _get_object(module_class, self, physicalPacket) - obj.start() - - def getArchiveData(self): - print "In getArchiveData()" - for module_class in self.archive_list: - obj = _get_object(module_class, self) - obj.start() - - def processArchiveData(self): - print "In processArchiveData()" - for module_class in self.process_list: - obj = _get_object(module_class, self) - obj.start() - - def start(self): - - self.run() - - def run(self): - - self.setup() - - syslog.syslog(syslog.LOG_INFO, "wxengine: Starting main packet loop.") - - while True: - - self.preloop() - - # Next time to ask for archive records: - nextArchive_ts = (int(time.time() / self.station.archive_interval) + 1) * self.station.archive_interval + self.station.archive_delay - - # Get LOOP packets in big batches, then cancel as necessary when it's time - # to request an archive record: - for physicalPacket in self.station.genLoopPacketsUntil(nextArchive_ts): - - # Process the physical LOOP packet: - self.processLoopPacket(physicalPacket) - - # Calculate/get new archive data: - self.getArchiveData() - - # Now process it. Typically, this means generating reports, etc. - self.processArchiveData() - - -class StdSetup(object): - - def __init__(self, engine): - self.engine = engine - self.config_dict = engine.config_dict - - def start(self): - self.run() - - def run(self): - - archiveFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'], - self.config_dict['Archive']['archive_file']) - self.engine.archive = weewx.archive.Archive(archiveFilename) - - statsFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'], - self.config_dict['Stats']['stats_file']) - # statsDb is an instance of weewx.stats.StatsDb, which wraps the stats sqlite file - self.engine.statsDb = weewx.stats.StatsDb(statsFilename, - int(self.config_dict['Station'].get('heating_base', '65')), - int(self.config_dict['Station'].get('cooling_base', '65')), - int(self.config_dict['Station'].get('cache_loop_data', '1'))) - self.engine.last_synch_ts = 0 - - # Set up the main archive database: - self.setupArchiveDatabase() - - # Set up the statistical database: - self.setupStatsDatabase() - - # Set up the Weather Underground thread: - self.setupWeatherUnderground() - - # Catch up if possible - self.catchUpArchiveData() - - def setupArchiveDatabase(self): - """Setup the main database archive""" - # Configure it if necessary (this will do nothing if the database has - # already been configured): - self.engine.archive.config() - - - def setupStatsDatabase(self): - """Prepare the stats database""" - # Configure it if necessary (this will do nothing if the database has - # already been configured): - self.engine.statsDb.config(self.config_dict['Stats'].get('stats_types')) - - # Backfill it with data from the archive. This will do nothing if - # the stats database is already up-to-date. - weewx.stats.backfill(self.engine.archive, self.engine.statsDb) - - def catchUpArchiveData(self): - getArchiveData(self.engine.station, self.engine.archive, self.engine.statsDb) - - def setupWeatherUnderground(self): - """Set up the WU thread.""" - wunder_dict = self.config_dict.get('Wunderground') - - # Make sure we have a section [Wunderground] and that the station name - # and password exist before committing: - if wunder_dict and (wunder_dict.has_key('station') and - wunder_dict.has_key('password')): - weewx.wunderground.wunderQueue = Queue.Queue() - t = weewx.wunderground.WunderThread(**wunder_dict) - - -class StdPreLoop(object): - - def __init__(self, engine): - self.engine = engine - - def start(self): - self.run() - - def run(self): - print "In StdPreLoop.run()" - # Synch up the station's clock if it's been more than - # clock_check seconds since the last check: - now_ts = time.time() - if now_ts - self.engine.last_synch_ts >= self.engine.station.clock_check: - self.engine.station.setTime() - self.engine.last_synch_ts = now_ts - -def getArchiveData(station, archive, statsDb): - """This function gets or calculates new archive data""" - lastgood_ts = archive.lastGoodStamp() - - nrec = 0 - # Add all missed archive records since the last good record in the database - for rec in station.genArchivePackets(lastgood_ts) : - print"REC:-> ", weeutil.weeutil.timestamp_to_string(rec['dateTime']), rec['barometer'],\ - rec['outTemp'], rec['windSpeed'],\ - rec['windDir'], " <-" - archive.addRecord(rec) - statsDb.addArchiveRecord(rec) - if weewx.wunderground.wunderQueue: - weewx.wunderground.wunderQueue.put((archive, rec['dateTime'])) - nrec += 1 - - if nrec != 0: - syslog.syslog(syslog.LOG_INFO, "wxengine: %d new archive packets added to database" % nrec) - -#class StdProcessLoopPacket(object): -# -# def __init__(self, engine): -# self.engine = engine -# -# def start(self): -# self.run() -# -# def run(self): diff --git a/weewxd.py b/weewxd.py index 2c0ce398..4f8f130f 100755 --- a/weewxd.py +++ b/weewxd.py @@ -14,52 +14,7 @@ Entry point to the weewx weather system. """ -import sys -import os.path -import syslog -from optparse import OptionParser -import configobj - -import daemon - import weewx.wxengine -usagestr = """ - %prog config_path [--daemon] - - Entry point to the weewx weather program. Can be run from the command - line, or with the '--daemon' option, as a daemon. - -Arguments: - config_path: Path to the configuration file to be used. -""" - - -# Set defaults for the system logger: -syslog.openlog('weewx', syslog.LOG_PID|syslog.LOG_CONS) -syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_INFO)) - -parser = OptionParser(usage=usagestr) -parser.add_option("-d", "--daemon", action="store_true", dest="daemon", help="Run as a daemon") -(options, args) = parser.parse_args() - -if len(args) < 1: - sys.stderr.write("Missing argument(s).\n") - sys.stderr.write(parser.parse_args(["--help"])) - exit() - -if options.daemon: - daemon.daemonize(pidfile='/var/run/weewx.pid') - -# Try to open up the given configuration file. Declare an error if unable to. -try : - config_dict = configobj.ConfigObj(args[0], file_error=True) -except IOError: - sys.stderr.write("Unable to open configuration file %s" % args[0]) - syslog.syslog(syslog.LOG_CRIT, "main: Unable to open configuration file %s" % args[0]) - exit() - -syslog.syslog(syslog.LOG_INFO, "main: Using configuration file %s." % os.path.abspath(args[0])) - -# Prepare and enter the main loop: -weewx.wxengine.main(config_dict) +# Enter the main loop: +weewx.wxengine.main()