Started on a service oriented architecture.

This commit is contained in:
Tom Keffer
2009-12-20 00:45:56 +00:00
parent e68c3154cf
commit 00ee61d428
2 changed files with 329 additions and 277 deletions

View File

@@ -14,55 +14,345 @@ import time
import socket
import os.path
import Queue
import threading
from optparse import OptionParser
import configobj
import daemon
import weewx
import weewx.archive
import weewx.stats
import weewx.wunderground
import weeutil.weeutil
def main(config_dict):
usagestr = """
%prog config_path [--daemon]
Entry point to the weewx weather program. Can be run from the command
line, or with the '--daemon' option, as a daemon.
Arguments:
config_path: Path to the configuration file to be used.
"""
#===============================================================================
# Class StdEngine
#===============================================================================
class StdEngine(object):
def __init__(self, service_list):
"""Initialize an instance of StdEngine.
For each listed service in service_list, instantiates an instance of the class,
passing self as the only argument."""
self.service_obj = [_get_object(svc, self) for svc in service_list]
def setup(self):
"""Run before anything else."""
self.parseArgs()
# Set up the main archive database:
self.setupArchiveDatabase()
# Set up the statistical database:
self.setupStatsDatabase()
# Set up the weather station hardware:
self.setupStation()
# Set a default socket time out, in case FTP or HTTP hang:
timeout = int(self.config_dict.get('socket_timeout', '20'))
socket.setdefaulttimeout(timeout)
# Allow each service to run its setup:
for obj in self.service_obj:
obj.setup()
def parseArgs(self):
"""Parse any command line options."""
parser = OptionParser(usage=usagestr)
parser.add_option("-d", "--daemon", action="store_true", dest="daemon", help="Run as a daemon")
(options, args) = parser.parse_args()
if len(args) < 1:
sys.stderr.write("Missing argument(s).\n")
sys.stderr.write(parser.parse_args(["--help"]))
exit()
if options.daemon:
daemon.daemonize(pidfile='/var/run/weewx.pid')
# Try to open up the given configuration file. Declare an error if unable to.
try :
self.config_dict = configobj.ConfigObj(args[0], file_error=True)
except IOError:
sys.stderr.write("Unable to open configuration file %s" % args[0])
syslog.syslog(syslog.LOG_CRIT, "main: Unable to open configuration file %s" % args[0])
exit()
syslog.syslog(syslog.LOG_INFO, "main: Using configuration file %s." % os.path.abspath(args[0]))
if int(self.config_dict.get('debug', '0')):
# Get extra debug info. Set the logging mask for full debug info
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))
# Set the global debug flag:
weewx.debug = True
def setupArchiveDatabase(self):
"""Setup the main database archive"""
archiveFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'],
self.config_dict['Archive']['archive_file'])
self.archive = weewx.archive.Archive(archiveFilename)
# Configure it if necessary (this will do nothing if the database has
# already been configured):
self.archive.config()
def setupStatsDatabase(self):
"""Prepare the stats database"""
statsFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'],
self.config_dict['Stats']['stats_file'])
# statsDb is an instance of weewx.stats.StatsDb, which wraps the stats sqlite file
self.statsDb = weewx.stats.StatsDb(statsFilename,
int(self.config_dict['Station'].get('heating_base', '65')),
int(self.config_dict['Station'].get('cooling_base', '65')),
int(self.config_dict['Station'].get('cache_loop_data', '1')))
# Configure it if necessary (this will do nothing if the database has
# already been configured):
self.statsDb.config(self.config_dict['Stats'].get('stats_types'))
# Backfill it with data from the archive. This will do nothing if
# the stats database is already up-to-date.
weewx.stats.backfill(self.archive, self.statsDb)
def setupStation(self):
"""Set up the weather station hardware."""
# Get the hardware type from the configuration dictionary.
# This will be a string such as "VantagePro"
stationType = self.config_dict['Station']['station_type']
# Look for and load the module that handles this hardware type:
_moduleName = "weewx." + stationType
__import__(_moduleName)
hardware_module = sys.modules[_moduleName]
try:
# Now open up the weather station:
self.station = hardware_module.WxStation(self.config_dict[stationType])
except Exception, ex:
# Caught unrecoverable error. Log it, exit
syslog.syslog(syslog.LOG_CRIT, "main: Unable to open WX station hardware: %s" % ex)
syslog.syslog(syslog.LOG_CRIT, "main: Exiting.")
# Reraise the exception (this will eventually cause the program to exit)
raise
def preloop(self):
"""Run every time before asking for LOOP packets"""
print "In preloop()"
for obj in self.service_obj:
obj.preloop()
def processLoopPacket(self, physicalPacket):
"""Run whenever a LOOP packet needs to be processed."""
print "In processLoopPacket()"
for obj in self.service_obj:
obj.processLoopPacket(physicalPacket)
def getArchiveData(self):
"""This function gets or calculates new archive data"""
print "In getArchiveData()"
lastgood_ts = self.archive.lastGoodStamp()
nrec = 0
# Add all missed archive records since the last good record in the database
for rec in self.station.genArchivePackets(lastgood_ts) :
print"REC:-> ", weeutil.weeutil.timestamp_to_string(rec['dateTime']), rec['barometer'],\
rec['outTemp'], rec['windSpeed'],\
rec['windDir'], " <-"
# Add the new record to the archive database and stats database:
self.archive.addRecord(rec)
self.statsDb.addArchiveRecord(rec)
# Give each service a chance to take a look at it:
self.postArchiveData(rec)
nrec += 1
if nrec != 0:
syslog.syslog(syslog.LOG_INFO, "wxengine: %d new archive packets added to database" % nrec)
def postArchiveData(self, rec):
"""Run whenever any new archive data appears."""
for obj in self.service_obj:
obj.postArchiveData(rec)
def processArchiveData(self):
"""Run after any archive data has been retrieved and put in the database."""
print "In processArchiveData()"
for obj in self.service_obj:
obj.processArchiveData()
def start(self):
self.run()
def run(self):
self.setup()
syslog.syslog(syslog.LOG_INFO, "wxengine: Starting main packet loop.")
while True:
self.preloop()
# Next time to ask for archive records:
nextArchive_ts = (int(time.time() / self.station.archive_interval) + 1) * self.station.archive_interval + self.station.archive_delay
for physicalPacket in self.station.genLoopPacketsUntil(nextArchive_ts):
# Process the physical LOOP packet:
self.processLoopPacket(physicalPacket)
# Calculate/get new archive data:
self.getArchiveData()
# Now process it. Typically, this means generating reports, etc.
self.processArchiveData()
#===============================================================================
# Class StdService
#===============================================================================
class StdService(object):
def __init__(self, engine):
self.engine = engine
def setup(self, engine):
pass
def preloop(self):
pass
def processLoopPacket(self, physicalPacket):
pass
def postArchiveData(self, rec):
pass
def processArchiveData(self):
pass
#===============================================================================
# Class StdCatchUp
#===============================================================================
class StdCatchUp(StdService):
def __init__(self, engine):
StdService.__init__(self, engine)
def setup(self):
self.engine.getArchiveData()
#===============================================================================
# Class StdTimeSynch
#===============================================================================
class StdTimeSynch(StdService):
def __init__(self, engine):
StdService.__init__(self, engine)
self.engine.last_synch_ts = 0
def preloop(self):
# Synch up the station's clock if it's been more than
# clock_check seconds since the last check:
now_ts = time.time()
if now_ts - self.engine.last_synch_ts >= self.engine.station.clock_check:
self.engine.station.setTime()
self.engine.last_synch_ts = now_ts
#===============================================================================
# Class StdWunderground
#===============================================================================
class StdWunderground(StdService):
import weewx.wunderground
def __init__(self, engine):
StdService.__init__(self, engine)
def setup(self):
wunder_dict = self.engine.config_dict.get('Wunderground')
# Make sure we have a section [Wunderground] and that the station name
# and password exist before committing:
if wunder_dict and (wunder_dict.has_key('station') and
wunder_dict.has_key('password')):
self.queue = Queue.Queue()
archiveFilename = os.path.join(self.engine.config_dict['Station']['WEEWX_ROOT'],
self.engine.config_dict['Archive']['archive_file'])
t = weewx.wunderground.WunderThread(archiveFilename = archiveFilename,
queue = self.queue,
**wunder_dict)
else:
self.queue = None
def postArchiveData(self, rec):
if self.queue:
self.queue.put(rec['dateTime'])
#===============================================================================
# Class StdProcess
#===============================================================================
class StdProcess(StdService):
def __init__(self, engine):
StdService.__init__(self, engine)
def processArchiveData(self):
"""This function processes any new archive data"""
# Now process the data, using a separate thread
processThread = threading.Thread(target = weewx.processdata.processData,
args =(self.config_dict, ))
processThread.start()
#===============================================================================
# Function main
#===============================================================================
def main(EngineClass = StdEngine,
service_list = ['weewx.wxengine.StdWunderground',
'weewx.wxengine.StdCatchUp',
'weewx.wxengine.StdTimeSynch',
'weewx.wxengine.StdProcess']) :
"""Prepare the main loop and run it.
Mostly consists of a bunch of high-level preparatory calls, protected
Mostly consists of a bunch of high-level prepatory calls, protected
by try blocks in the case of an exception."""
if int(config_dict.get('debug', '0')):
# Get extra debug info. Set the logging mask for full debug info
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_DEBUG))
# Set the global debug flag:
weewx.debug = True
# Set a default socket time out, in case FTP or HTTP hang:
timeout = int(config_dict.get('socket_timeout', '20'))
socket.setdefaulttimeout(timeout)
# Get the hardware type from the configuration dictionary.
# This will be a string such as "VantagePro"
stationType = config_dict['Station']['station_type']
# Look for and load the module that handles this hardware type:
_moduleName = "weewx." + stationType
__import__(_moduleName)
hardware_module = sys.modules[_moduleName]
try:
# Now open up the weather station:
station = hardware_module.WxStation(config_dict[stationType])
except Exception, ex:
# Caught unrecoverable error. Log it, exit
syslog.syslog(syslog.LOG_CRIT, "main: Unable to open WX station hardware: %s" % ex)
syslog.syslog(syslog.LOG_CRIT, "main: Exiting.")
# Reraise the exception (this will eventually cause the program to exit)
raise
try:
# Create and initialize the WxEngine object, using the dictionary
# and the hardware station:
engine = WxEngine(config_dict, station)
# Create and initialize the MainLoop object
engine = EngineClass(service_list)
except Exception, ex:
# Caught unrecoverable error. Log it, exit
@@ -117,196 +407,3 @@ def _get_object(module_class, *args, **kwargs):
obj = mod(*args, **kwargs)
return obj
class WxEngine(object):
def __init__(self, config_dict, station,
setup_list = [('weewx.wxengine.StdSetup')],
preloop_list = [('weewx.wxengine.StdPreLoop')],
loop_list = [],
archive_list = [],
process_list = []):
self.config_dict = config_dict
self.station = station
self.setup_list = setup_list
self.preloop_list = preloop_list
self.loop_list = loop_list
self.archive_list = archive_list
self.process_list = process_list
def setup(self):
print "In setup()"
for module_class in self.setup_list:
obj = _get_object(module_class, self)
obj.start()
def preloop(self):
print "In preloop()"
for module_class in self.preloop_list:
obj = _get_object(module_class, self)
obj.start()
def processLoopPacket(self, physicalPacket):
print "In processLoopPacket()"
for module_class in self.loop_list:
obj = _get_object(module_class, self, physicalPacket)
obj.start()
def getArchiveData(self):
print "In getArchiveData()"
for module_class in self.archive_list:
obj = _get_object(module_class, self)
obj.start()
def processArchiveData(self):
print "In processArchiveData()"
for module_class in self.process_list:
obj = _get_object(module_class, self)
obj.start()
def start(self):
self.run()
def run(self):
self.setup()
syslog.syslog(syslog.LOG_INFO, "wxengine: Starting main packet loop.")
while True:
self.preloop()
# Next time to ask for archive records:
nextArchive_ts = (int(time.time() / self.station.archive_interval) + 1) * self.station.archive_interval + self.station.archive_delay
# Get LOOP packets in big batches, then cancel as necessary when it's time
# to request an archive record:
for physicalPacket in self.station.genLoopPacketsUntil(nextArchive_ts):
# Process the physical LOOP packet:
self.processLoopPacket(physicalPacket)
# Calculate/get new archive data:
self.getArchiveData()
# Now process it. Typically, this means generating reports, etc.
self.processArchiveData()
class StdSetup(object):
def __init__(self, engine):
self.engine = engine
self.config_dict = engine.config_dict
def start(self):
self.run()
def run(self):
archiveFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'],
self.config_dict['Archive']['archive_file'])
self.engine.archive = weewx.archive.Archive(archiveFilename)
statsFilename = os.path.join(self.config_dict['Station']['WEEWX_ROOT'],
self.config_dict['Stats']['stats_file'])
# statsDb is an instance of weewx.stats.StatsDb, which wraps the stats sqlite file
self.engine.statsDb = weewx.stats.StatsDb(statsFilename,
int(self.config_dict['Station'].get('heating_base', '65')),
int(self.config_dict['Station'].get('cooling_base', '65')),
int(self.config_dict['Station'].get('cache_loop_data', '1')))
self.engine.last_synch_ts = 0
# Set up the main archive database:
self.setupArchiveDatabase()
# Set up the statistical database:
self.setupStatsDatabase()
# Set up the Weather Underground thread:
self.setupWeatherUnderground()
# Catch up if possible
self.catchUpArchiveData()
def setupArchiveDatabase(self):
"""Setup the main database archive"""
# Configure it if necessary (this will do nothing if the database has
# already been configured):
self.engine.archive.config()
def setupStatsDatabase(self):
"""Prepare the stats database"""
# Configure it if necessary (this will do nothing if the database has
# already been configured):
self.engine.statsDb.config(self.config_dict['Stats'].get('stats_types'))
# Backfill it with data from the archive. This will do nothing if
# the stats database is already up-to-date.
weewx.stats.backfill(self.engine.archive, self.engine.statsDb)
def catchUpArchiveData(self):
getArchiveData(self.engine.station, self.engine.archive, self.engine.statsDb)
def setupWeatherUnderground(self):
"""Set up the WU thread."""
wunder_dict = self.config_dict.get('Wunderground')
# Make sure we have a section [Wunderground] and that the station name
# and password exist before committing:
if wunder_dict and (wunder_dict.has_key('station') and
wunder_dict.has_key('password')):
weewx.wunderground.wunderQueue = Queue.Queue()
t = weewx.wunderground.WunderThread(**wunder_dict)
class StdPreLoop(object):
def __init__(self, engine):
self.engine = engine
def start(self):
self.run()
def run(self):
print "In StdPreLoop.run()"
# Synch up the station's clock if it's been more than
# clock_check seconds since the last check:
now_ts = time.time()
if now_ts - self.engine.last_synch_ts >= self.engine.station.clock_check:
self.engine.station.setTime()
self.engine.last_synch_ts = now_ts
def getArchiveData(station, archive, statsDb):
"""This function gets or calculates new archive data"""
lastgood_ts = archive.lastGoodStamp()
nrec = 0
# Add all missed archive records since the last good record in the database
for rec in station.genArchivePackets(lastgood_ts) :
print"REC:-> ", weeutil.weeutil.timestamp_to_string(rec['dateTime']), rec['barometer'],\
rec['outTemp'], rec['windSpeed'],\
rec['windDir'], " <-"
archive.addRecord(rec)
statsDb.addArchiveRecord(rec)
if weewx.wunderground.wunderQueue:
weewx.wunderground.wunderQueue.put((archive, rec['dateTime']))
nrec += 1
if nrec != 0:
syslog.syslog(syslog.LOG_INFO, "wxengine: %d new archive packets added to database" % nrec)
#class StdProcessLoopPacket(object):
#
# def __init__(self, engine):
# self.engine = engine
#
# def start(self):
# self.run()
#
# def run(self):

View File

@@ -14,52 +14,7 @@
Entry point to the weewx weather system.
"""
import sys
import os.path
import syslog
from optparse import OptionParser
import configobj
import daemon
import weewx.wxengine
usagestr = """
%prog config_path [--daemon]
Entry point to the weewx weather program. Can be run from the command
line, or with the '--daemon' option, as a daemon.
Arguments:
config_path: Path to the configuration file to be used.
"""
# Set defaults for the system logger:
syslog.openlog('weewx', syslog.LOG_PID|syslog.LOG_CONS)
syslog.setlogmask(syslog.LOG_UPTO(syslog.LOG_INFO))
parser = OptionParser(usage=usagestr)
parser.add_option("-d", "--daemon", action="store_true", dest="daemon", help="Run as a daemon")
(options, args) = parser.parse_args()
if len(args) < 1:
sys.stderr.write("Missing argument(s).\n")
sys.stderr.write(parser.parse_args(["--help"]))
exit()
if options.daemon:
daemon.daemonize(pidfile='/var/run/weewx.pid')
# Try to open up the given configuration file. Declare an error if unable to.
try :
config_dict = configobj.ConfigObj(args[0], file_error=True)
except IOError:
sys.stderr.write("Unable to open configuration file %s" % args[0])
syslog.syslog(syslog.LOG_CRIT, "main: Unable to open configuration file %s" % args[0])
exit()
syslog.syslog(syslog.LOG_INFO, "main: Using configuration file %s." % os.path.abspath(args[0]))
# Prepare and enter the main loop:
weewx.wxengine.main(config_dict)
# Enter the main loop:
weewx.wxengine.main()