From f4bcaf73bbf82f69bdf0e79426df90214122a2f3 Mon Sep 17 00:00:00 2001 From: Tom Keffer Date: Sat, 9 Nov 2013 15:49:03 +0000 Subject: [PATCH] Implemented StdWOW --- bin/weewx/restx.py | 229 ++++++++++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 77 deletions(-) diff --git a/bin/weewx/restx.py b/bin/weewx/restx.py index 4f580be3..0fcbaf40 100644 --- a/bin/weewx/restx.py +++ b/bin/weewx/restx.py @@ -32,7 +32,7 @@ class BadLogin(StandardError): #=============================================================================== class StdRESTbase(weewx.wxengine.StdService): - """Base class for RESTful weewx services.""" + """Base class for the RESTful weewx services.""" # # This class implements a generic protocol processing model: @@ -56,6 +56,8 @@ class StdRESTbase(weewx.wxengine.StdService): interval: If non-None, how long to wait from the last post before accepting a new post. + + protocol: A string holding the protocol name I am implementing. """ super(StdRESTbase, self).__init__(engine, config_dict) @@ -69,6 +71,8 @@ class StdRESTbase(weewx.wxengine.StdService): self.lastpost= None def init_info(self, site_dict): + """Extract information out of the site dictionary or, if unavailable, the engine's + station info structure.""" self.latitude = float(site_dict.get('latitude', self.engine.stn_info.latitude_f)) self.longitude = float(site_dict.get('longitude', self.engine.stn_info.longitude_f)) self.hardware = site_dict.get('station_type', self.engine.stn_info.hardware) @@ -76,9 +80,11 @@ class StdRESTbase(weewx.wxengine.StdService): self.station_url = site_dict.get('station_url', self.engine.stn_info.station_url) def init_loop_queue(self): + """Initiate the LOOP queue. """ self.loop_queue = Queue.Queue() def init_archive_queue(self): + """Initiate the archive queue.""" self.archive_queue = Queue.Queue() def shutDown(self): @@ -88,6 +94,7 @@ class StdRESTbase(weewx.wxengine.StdService): @staticmethod def shutDown_thread(q, t): + """Function to shut down a thread.""" if q: # Put a None in the queue. This will signal to the thread to shutdown q.put(None) @@ -100,7 +107,6 @@ class StdRESTbase(weewx.wxengine.StdService): def skip_this_post(self, time_ts): """Check whether the post is current""" - # Don't post if this record is too old _how_old = time.time() - time_ts if self.stale and _how_old > self.stale: @@ -114,7 +120,7 @@ class StdRESTbase(weewx.wxengine.StdService): def process_record(self, record): """Generic processing function that follows the protocol model.""" - + # See whether this post should be skipped. try: self.skip_this_post(record['dateTime']) except SkippedPost, e: @@ -136,13 +142,13 @@ class StdRESTbase(weewx.wxengine.StdService): This is a general version that works for: - WeatherUnderground - PWSweather + - WOW - CWOP It can be overridden and specialized for additional protocols. returns: A dictionary of weather values""" _time_ts = record['dateTime'] - _sod_ts = weeutil.weeutil.startOfDay(_time_ts) # Make a copy of the record, then start adding to it: @@ -193,11 +199,12 @@ class StdRESTbase(weewx.wxengine.StdService): class Ambient(StdRESTbase): """Base class for weather sites that use the Ambient protocol.""" - # Types and formats of the data to be published: - _formats = {'dateTime' : ('dateutc', lambda _v : urllib.quote(datetime.datetime.utcfromtimestamp(_v).isoformat('+'), '-+')), + # Types and formats of the data to be published, using the Ambient notation + _formats = {'dateTime' : ('dateutc', + lambda _v : urllib.quote(datetime.datetime.utcfromtimestamp(_v).isoformat('+'), '-+')), 'action' : ('action', '%s'), - 'ID' : ('ID', '%s'), - 'PASSWORD' : ('PASSWORD', '%s'), + 'station' : ('ID', '%s'), + 'password' : ('PASSWORD', '%s'), 'softwaretype': ('softwaretype', '%s'), 'barometer' : ('baromin', '%.3f'), 'outTemp' : ('tempf', '%.1f'), @@ -211,10 +218,13 @@ class Ambient(StdRESTbase): 'radiation' : ('solarradiation', '%.2f'), 'UV' : ('UV', '%.2f')} - def __init__(self, engine, config_dict, **ambient_dict): + def __init__(self, engine, config_dict, format_dict=_formats, **ambient_dict): """Base class that implements the Ambient protocol. Named parameters: + format_dict: A dictionary containing the format encodings to be used. + The default is Ambient._formats + station: The station ID (eg. KORHOODR3) [Required] password: Password for the station [Required] @@ -246,7 +256,8 @@ class Ambient(StdRESTbase): (Always 0 for rapidfire posts). Default is infinite. """ - super(Ambient, self).__init__(engine, config_dict, **ambient_dict) + super(Ambient, self).__init__(engine, config_dict, + **ambient_dict) # Try extracting the required keywords. If this fails, an exception # of type KeyError will be raised. Be prepared to catch it. @@ -260,10 +271,15 @@ class Ambient(StdRESTbase): # If we got here, we have the minimum necessary. + # Save the format encoding to be used + self.format_dict = format_dict + # It's not actually used by the Ambient protocol, but, for completeness, # initialize the site-specific information: self.init_info(ambient_dict) + # The default is not not do an archive post if a rapidfire post has been + # specified, but this can be overridden do_rapidfire_post = to_bool(ambient_dict.get('rapidfire', False)) do_archive_post = to_bool(ambient_dict.get('archive_post', not do_rapidfire_post)) @@ -317,10 +333,10 @@ class Ambient(StdRESTbase): return self.process_record(event.record) def augment_protocol(self, record): - """Augment a record with the Ambient-specific keywords.""" - record['action'] = 'updateraw' - record['ID'] = self.station - record['PASSWORD'] = self.password + """Augment a record with the Ambient-specific data fields.""" + record['action'] = 'updateraw' + record['station'] = self.station + record['password'] = self.password record['softwaretype'] = "weewx-%s" % weewx.__version__ def format_protocol(self, record): @@ -343,7 +359,7 @@ class Ambient(StdRESTbase): _datadict['usUnits'] = weewx.US # Reformat according to the Ambient protocol: - _post_dict = reformat_dict(_datadict, Ambient._formats) + _post_dict = reformat_dict(_datadict, self.format_dict) # Form the full URL _url = self.archive_url + '?' + weeutil.weeutil.urlencode(_post_dict) @@ -367,14 +383,15 @@ class StdWunderground(Ambient): # First extract the required parameters. If one of them is missing, # a KeyError exception will occur. Be prepared to catch it. try: - # Extract the dictionary with the WU options: + # Extract a copy of the dictionary with the WU options: ambient_dict=dict(config_dict['StdRESTful']['Wunderground']) ambient_dict.setdefault('rapidfire_url', StdWunderground.rapidfire_url) ambient_dict.setdefault('archive_url', StdWunderground.archive_url) ambient_dict.setdefault('name', 'Wunderground') - super(StdWunderground, self).__init__(engine, config_dict, **ambient_dict) + super(StdWunderground, self).__init__(engine, config_dict, + **ambient_dict) syslog.syslog(syslog.LOG_DEBUG, "restx: Data will be posted to Wunderground") - except ServiceError, e: + except (KeyError, ServiceError), e: syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to Wunderground") syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e) @@ -394,12 +411,62 @@ class StdPWSweather(Ambient): ambient_dict=dict(config_dict['StdRESTful']['PWSweather']) ambient_dict.setdefault('archive_url', StdPWSweather.archive_url) ambient_dict.setdefault('name', 'PWSweather') - super(StdPWSweather, self).__init__(engine, config_dict, **ambient_dict) + super(StdPWSweather, self).__init__(engine, config_dict, + **ambient_dict) syslog.syslog(syslog.LOG_DEBUG, "restx: Data will be posted to PWSweather") - except ServiceError, e: + except (KeyError, ServiceError), e: syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to PWSweather") syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e) +#=============================================================================== +# Class StdWOW +#=============================================================================== + +class StdWOW(Ambient): + + """Upload using the UK Met Office's WOW protocol. + + For details of the WOW upload protocol, see + http://wow.metoffice.gov.uk/support/dataformats#dataFileUpload + """ + + # Types and formats of the data to be published: + _formats = {'dateTime' : ('dateutc', lambda _v : urllib.quote(datetime.datetime.utcfromtimestamp(_v).isoformat('+'), '-+')), + 'station' : ('siteid', '%s'), + 'password' : ('siteAuthenticationKey', '%s'), + 'softwaretype': ('softwaretype', '%s'), + 'barometer' : ('baromin', '%.1f'), + 'outTemp' : ('tempf', '%.1f'), + 'outHumidity' : ('humidity', '%.0f'), + 'windSpeed' : ('windspeedmph', '%.0f'), + 'windDir' : ('winddir', '%.0f'), + 'windGust' : ('windgustmph', '%.0f'), + 'windGustDir' : ('windgustdir', '%.0f'), + 'dewpoint' : ('dewptf', '%.1f'), + 'hourRain' : ('rainin', '%.2f'), + 'dayRain' : ('dailyrainin', '%.2f')} + + # The URL used by WOW: + archive_url = "http://wow.metoffice.gov.uk/automaticreading" + + def __init__(self, engine, config_dict): + + try: + ambient_dict=dict(config_dict['StdRESTful']['WOW']) + ambient_dict.setdefault('archive_url', StdWOW.archive_url) + ambient_dict.setdefault('name', 'WOW') + if ambient_dict.has_key('siteid'): + ambient_dict['station'] = ambient_dict['siteid'] + if ambient_dict.has_key('siteAuthenticationKey'): + ambient_dict['password'] = ambient_dict['siteAuthenticationKey'] + super(StdWOW, self).__init__(engine, config_dict, + format_dict=StdWOW._formats, + **ambient_dict) + syslog.syslog(syslog.LOG_DEBUG, "restx: Data will be posted to WOW") + except (KeyError, ServiceError), e: + syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to WOW") + syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e) + #=============================================================================== # Class PostRequest #=============================================================================== @@ -465,6 +532,13 @@ class PostRequest(threading.Thread): # in case there's a network problem. try: _response = urllib2.urlopen(request) + except urllib2.HTTPError, e: + # WOW signals a bad login with a HTML Error 403 code: + if e.code == 403: + raise BadLogin(e) + else: + syslog.syslog(syslog.LOG_DEBUG, "restx: Failed attempt #%d to upload to %s" % (_count+1, self.name)) + syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % (e,)) except (urllib2.URLError, socket.error, httplib.BadStatusLine), e: # Unsuccessful. Log it and go around again for another try syslog.syslog(syslog.LOG_DEBUG, "restx: Failed attempt #%d to upload to %s" % (_count+1, self.name)) @@ -501,12 +575,13 @@ class StdCWOP(StdRESTbase): # First extract the required parameters. If one of them is missing, # a KeyError exception will occur. Be prepared to catch it. try: - # Extract the CWOP dictionary: + # Extract a copy of the CWOP dictionary: cwop_dict=dict(config_dict['StdRESTful']['CWOP']) cwop_dict.setdefault('name', 'CWOP') cwop_dict['stale'] = to_int(cwop_dict.get('stale', 1800)) cwop_dict['interval'] = to_int(cwop_dict.get('interval')) - super(StdCWOP, self).__init__(engine, config_dict, **cwop_dict) + super(StdCWOP, self).__init__(engine, config_dict, + **cwop_dict) # Extract the station and (if necessary) passcode self.station = cwop_dict['station'].upper() @@ -550,80 +625,80 @@ class StdCWOP(StdRESTbase): return self.process_record(event.record) def get_login_string(self): - login = "user %s pass %s vers weewx %s\r\n" % (self.station, self.passcode, weewx.__version__) - return login + _login = "user %s pass %s vers weewx %s\r\n" % (self.station, self.passcode, weewx.__version__) + return _login def get_tnc_packet(self, in_record): """Form the TNC2 packet used by CWOP.""" # Make sure the record is in US units: if in_record['usUnits'] == weewx.US: - record = in_record + _record = in_record else: - record = weewx.units.StdUnitConverters[weewx.US].convertDict(in_record) - record['usUnits'] = weewx.US + _record = weewx.units.StdUnitConverters[weewx.US].convertDict(in_record) + _record['usUnits'] = weewx.US # Preamble to the TNC packet: - prefix = "%s>APRS,TCPIP*:" % (self.station,) + _prefix = "%s>APRS,TCPIP*:" % (self.station,) # Time: - time_tt = time.gmtime(record['dateTime']) - time_str = time.strftime("@%d%H%Mz", time_tt) + _time_tt = time.gmtime(_record['dateTime']) + _time_str = time.strftime("@%d%H%Mz", _time_tt) # Position: - lat_str = weeutil.weeutil.latlon_string(self.latitude, ('N', 'S'), 'lat') - lon_str = weeutil.weeutil.latlon_string(self.longitude, ('E', 'W'), 'lon') - latlon_str = '%s%s%s/%s%s%s' % (lat_str + lon_str) + _lat_str = weeutil.weeutil.latlon_string(self.latitude, ('N', 'S'), 'lat') + _lon_str = weeutil.weeutil.latlon_string(self.longitude, ('E', 'W'), 'lon') + _latlon_str = '%s%s%s/%s%s%s' % (_lat_str + _lon_str) # Wind and temperature - wt_list = [] - for obs_type in ['windDir', 'windSpeed', 'windGust', 'outTemp']: - v = record.get(obs_type) - wt_list.append("%03d" % v if v is not None else '...') - wt_str = "_%s/%sg%st%s" % tuple(wt_list) + _wt_list = [] + for _obs_type in ['windDir', 'windSpeed', 'windGust', 'outTemp']: + _v = _record.get(_obs_type) + _wt_list.append("%03d" % _v if _v is not None else '...') + _wt_str = "_%s/%sg%st%s" % tuple(_wt_list) # Rain - rain_list = [] - for obs_type in ['hourRain', 'rain24', 'dayRain']: - v = record.get(obs_type) - rain_list.append("%03d" % (v * 100.0) if v is not None else '...') - rain_str = "r%sp%sP%s" % tuple(rain_list) + _rain_list = [] + for _obs_type in ['hourRain', 'rain24', 'dayRain']: + _v = _record.get(_obs_type) + _rain_list.append("%03d" % (_v * 100.0) if _v is not None else '...') + _rain_str = "r%sp%sP%s" % tuple(_rain_list) # Barometer: - baro = record.get('altimeter') - if baro is None: - baro_str = "b....." + _baro = _record.get('altimeter') + if _baro is None: + _baro_str = "b....." else: # While everything else in the CWOP protocol is in US Customary, they # want the barometer in millibars. - baro_vt = weewx.units.convert((baro, 'inHg', 'group_pressure'), 'mbar') - baro_str = "b%05d" % (baro_vt[0] * 10.0) + _baro_vt = weewx.units.convert((_baro, 'inHg', 'group_pressure'), 'mbar') + _baro_str = "b%05d" % (_baro_vt[0] * 10.0) # Humidity: - humidity = record.get('outHumidity') - if humidity is None: - humid_str = "h.." + _humidity = _record.get('outHumidity') + if _humidity is None: + _humid_str = "h.." else: - humid_str = ("h%02d" % humidity) if humidity < 100.0 else "h00" + _humid_str = ("h%02d" % _humidity) if _humidity < 100.0 else "h00" # Radiation: - radiation = record.get('radiation') - if radiation is None: - radiation_str = "" - elif radiation < 1000.0: - radiation_str = "L%03d" % radiation - elif radiation < 2000.0: - radiation_str = "l%03d" % (radiation - 1000) + _radiation = _record.get('radiation') + if _radiation is None: + _radiation_str = "" + elif _radiation < 1000.0: + _radiation_str = "L%03d" % _radiation + elif _radiation < 2000.0: + _radiation_str = "l%03d" % (_radiation - 1000) else: - radiation_str = "" + _radiation_str = "" # Station equipment - equipment_str = ".weewx-%s-%s" % (weewx.__version__, self.hardware) + _equipment_str = ".weewx-%s-%s" % (weewx.__version__, self.hardware) - tnc_packet = prefix + time_str + latlon_str + wt_str + rain_str + \ - baro_str + humid_str + radiation_str + equipment_str + "\r\n" + _tnc_packet = _prefix + _time_str + _latlon_str + _wt_str + _rain_str + \ + _baro_str + _humid_str + _radiation_str + _equipment_str + "\r\n" - return tnc_packet + return _tnc_packet def format_protocol(self, record): # Get the login string @@ -681,17 +756,17 @@ class PostTNC(threading.Thread): _time_str = timestamp_to_string(_timestamp) syslog.syslog(syslog.LOG_INFO, "restx: Published record %s to %s" % (_time_str, self.name)) - def send_packet(self, _login, _tnc_packet): + def send_packet(self, login, tnc_packet): # Get a socket connection: _sock = self._get_connect() try: # Send the login: - self._send(_sock, _login) + self._send(_sock, login) # And then the packet - self._send(_sock, _tnc_packet) + self._send(_sock, tnc_packet) finally: _sock.close() @@ -699,27 +774,27 @@ class PostTNC(threading.Thread): # Go through the list of known server:ports, looking for # a connection that works: - for serv_addr_str in self.server: - server, port = serv_addr_str.split(":") - port = int(port) + for _serv_addr_str in self.server: + _server, _port_str = _serv_addr_str.split(":") + _port = int(_port_str) for _count in range(self.max_tries): try: - sock = socket.socket() - sock.connect((server, port)) + _sock = socket.socket() + _sock.connect((_server, _port)) except socket.error, e: # Unsuccessful. Log it and try again - syslog.syslog(syslog.LOG_DEBUG, "restx: Connection attempt #%d failed to %s server %s:%d" % (_count + 1, self.name, server, port)) + syslog.syslog(syslog.LOG_DEBUG, "restx: Connection attempt #%d failed to %s server %s:%d" % (_count + 1, self.name, _server, _port)) syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % (e,)) else: - syslog.syslog(syslog.LOG_DEBUG, "restx: Connected to %s server %s:%d" % (self.name, server, port)) - return sock + syslog.syslog(syslog.LOG_DEBUG, "restx: Connected to %s server %s:%d" % (self.name, _server, _port)) + return _sock # Couldn't connect on this attempt. Close it, try again. try: - sock.close() + _sock.close() except: pass # If we got here, that server didn't work. Log it and go on to the next one. - syslog.syslog(syslog.LOG_DEBUG, "restx: Unable to connect to %s server %s:%d" % (self.name, server, port)) + syslog.syslog(syslog.LOG_DEBUG, "restx: Unable to connect to %s server %s:%d" % (self.name, _server, _port)) # If we got here. None of the servers worked. Raise an exception raise IOError, "Unable to obtain a socket connection to %s" % (self.name,)