Implemented StdWOW

This commit is contained in:
Tom Keffer
2013-11-09 15:49:03 +00:00
parent a5c32181d6
commit f4bcaf73bb

View File

@@ -32,7 +32,7 @@ class BadLogin(StandardError):
#===============================================================================
class StdRESTbase(weewx.wxengine.StdService):
"""Base class for RESTful weewx services."""
"""Base class for the RESTful weewx services."""
#
# This class implements a generic protocol processing model:
@@ -56,6 +56,8 @@ class StdRESTbase(weewx.wxengine.StdService):
interval: If non-None, how long to wait from the last post before accepting
a new post.
protocol: A string holding the protocol name I am implementing.
"""
super(StdRESTbase, self).__init__(engine, config_dict)
@@ -69,6 +71,8 @@ class StdRESTbase(weewx.wxengine.StdService):
self.lastpost= None
def init_info(self, site_dict):
"""Extract information out of the site dictionary or, if unavailable, the engine's
station info structure."""
self.latitude = float(site_dict.get('latitude', self.engine.stn_info.latitude_f))
self.longitude = float(site_dict.get('longitude', self.engine.stn_info.longitude_f))
self.hardware = site_dict.get('station_type', self.engine.stn_info.hardware)
@@ -76,9 +80,11 @@ class StdRESTbase(weewx.wxengine.StdService):
self.station_url = site_dict.get('station_url', self.engine.stn_info.station_url)
def init_loop_queue(self):
"""Initiate the LOOP queue. """
self.loop_queue = Queue.Queue()
def init_archive_queue(self):
"""Initiate the archive queue."""
self.archive_queue = Queue.Queue()
def shutDown(self):
@@ -88,6 +94,7 @@ class StdRESTbase(weewx.wxengine.StdService):
@staticmethod
def shutDown_thread(q, t):
"""Function to shut down a thread."""
if q:
# Put a None in the queue. This will signal to the thread to shutdown
q.put(None)
@@ -100,7 +107,6 @@ class StdRESTbase(weewx.wxengine.StdService):
def skip_this_post(self, time_ts):
"""Check whether the post is current"""
# Don't post if this record is too old
_how_old = time.time() - time_ts
if self.stale and _how_old > self.stale:
@@ -114,7 +120,7 @@ class StdRESTbase(weewx.wxengine.StdService):
def process_record(self, record):
"""Generic processing function that follows the protocol model."""
# See whether this post should be skipped.
try:
self.skip_this_post(record['dateTime'])
except SkippedPost, e:
@@ -136,13 +142,13 @@ class StdRESTbase(weewx.wxengine.StdService):
This is a general version that works for:
- WeatherUnderground
- PWSweather
- WOW
- CWOP
It can be overridden and specialized for additional protocols.
returns: A dictionary of weather values"""
_time_ts = record['dateTime']
_sod_ts = weeutil.weeutil.startOfDay(_time_ts)
# Make a copy of the record, then start adding to it:
@@ -193,11 +199,12 @@ class StdRESTbase(weewx.wxengine.StdService):
class Ambient(StdRESTbase):
"""Base class for weather sites that use the Ambient protocol."""
# Types and formats of the data to be published:
_formats = {'dateTime' : ('dateutc', lambda _v : urllib.quote(datetime.datetime.utcfromtimestamp(_v).isoformat('+'), '-+')),
# Types and formats of the data to be published, using the Ambient notation
_formats = {'dateTime' : ('dateutc',
lambda _v : urllib.quote(datetime.datetime.utcfromtimestamp(_v).isoformat('+'), '-+')),
'action' : ('action', '%s'),
'ID' : ('ID', '%s'),
'PASSWORD' : ('PASSWORD', '%s'),
'station' : ('ID', '%s'),
'password' : ('PASSWORD', '%s'),
'softwaretype': ('softwaretype', '%s'),
'barometer' : ('baromin', '%.3f'),
'outTemp' : ('tempf', '%.1f'),
@@ -211,10 +218,13 @@ class Ambient(StdRESTbase):
'radiation' : ('solarradiation', '%.2f'),
'UV' : ('UV', '%.2f')}
def __init__(self, engine, config_dict, **ambient_dict):
def __init__(self, engine, config_dict, format_dict=_formats, **ambient_dict):
"""Base class that implements the Ambient protocol.
Named parameters:
format_dict: A dictionary containing the format encodings to be used.
The default is Ambient._formats
station: The station ID (eg. KORHOODR3) [Required]
password: Password for the station [Required]
@@ -246,7 +256,8 @@ class Ambient(StdRESTbase):
(Always 0 for rapidfire posts). Default is infinite.
"""
super(Ambient, self).__init__(engine, config_dict, **ambient_dict)
super(Ambient, self).__init__(engine, config_dict,
**ambient_dict)
# Try extracting the required keywords. If this fails, an exception
# of type KeyError will be raised. Be prepared to catch it.
@@ -260,10 +271,15 @@ class Ambient(StdRESTbase):
# If we got here, we have the minimum necessary.
# Save the format encoding to be used
self.format_dict = format_dict
# It's not actually used by the Ambient protocol, but, for completeness,
# initialize the site-specific information:
self.init_info(ambient_dict)
# The default is not not do an archive post if a rapidfire post has been
# specified, but this can be overridden
do_rapidfire_post = to_bool(ambient_dict.get('rapidfire', False))
do_archive_post = to_bool(ambient_dict.get('archive_post', not do_rapidfire_post))
@@ -317,10 +333,10 @@ class Ambient(StdRESTbase):
return self.process_record(event.record)
def augment_protocol(self, record):
"""Augment a record with the Ambient-specific keywords."""
record['action'] = 'updateraw'
record['ID'] = self.station
record['PASSWORD'] = self.password
"""Augment a record with the Ambient-specific data fields."""
record['action'] = 'updateraw'
record['station'] = self.station
record['password'] = self.password
record['softwaretype'] = "weewx-%s" % weewx.__version__
def format_protocol(self, record):
@@ -343,7 +359,7 @@ class Ambient(StdRESTbase):
_datadict['usUnits'] = weewx.US
# Reformat according to the Ambient protocol:
_post_dict = reformat_dict(_datadict, Ambient._formats)
_post_dict = reformat_dict(_datadict, self.format_dict)
# Form the full URL
_url = self.archive_url + '?' + weeutil.weeutil.urlencode(_post_dict)
@@ -367,14 +383,15 @@ class StdWunderground(Ambient):
# First extract the required parameters. If one of them is missing,
# a KeyError exception will occur. Be prepared to catch it.
try:
# Extract the dictionary with the WU options:
# Extract a copy of the dictionary with the WU options:
ambient_dict=dict(config_dict['StdRESTful']['Wunderground'])
ambient_dict.setdefault('rapidfire_url', StdWunderground.rapidfire_url)
ambient_dict.setdefault('archive_url', StdWunderground.archive_url)
ambient_dict.setdefault('name', 'Wunderground')
super(StdWunderground, self).__init__(engine, config_dict, **ambient_dict)
super(StdWunderground, self).__init__(engine, config_dict,
**ambient_dict)
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will be posted to Wunderground")
except ServiceError, e:
except (KeyError, ServiceError), e:
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to Wunderground")
syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e)
@@ -394,12 +411,62 @@ class StdPWSweather(Ambient):
ambient_dict=dict(config_dict['StdRESTful']['PWSweather'])
ambient_dict.setdefault('archive_url', StdPWSweather.archive_url)
ambient_dict.setdefault('name', 'PWSweather')
super(StdPWSweather, self).__init__(engine, config_dict, **ambient_dict)
super(StdPWSweather, self).__init__(engine, config_dict,
**ambient_dict)
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will be posted to PWSweather")
except ServiceError, e:
except (KeyError, ServiceError), e:
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to PWSweather")
syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e)
#===============================================================================
# Class StdWOW
#===============================================================================
class StdWOW(Ambient):
"""Upload using the UK Met Office's WOW protocol.
For details of the WOW upload protocol, see
http://wow.metoffice.gov.uk/support/dataformats#dataFileUpload
"""
# Types and formats of the data to be published:
_formats = {'dateTime' : ('dateutc', lambda _v : urllib.quote(datetime.datetime.utcfromtimestamp(_v).isoformat('+'), '-+')),
'station' : ('siteid', '%s'),
'password' : ('siteAuthenticationKey', '%s'),
'softwaretype': ('softwaretype', '%s'),
'barometer' : ('baromin', '%.1f'),
'outTemp' : ('tempf', '%.1f'),
'outHumidity' : ('humidity', '%.0f'),
'windSpeed' : ('windspeedmph', '%.0f'),
'windDir' : ('winddir', '%.0f'),
'windGust' : ('windgustmph', '%.0f'),
'windGustDir' : ('windgustdir', '%.0f'),
'dewpoint' : ('dewptf', '%.1f'),
'hourRain' : ('rainin', '%.2f'),
'dayRain' : ('dailyrainin', '%.2f')}
# The URL used by WOW:
archive_url = "http://wow.metoffice.gov.uk/automaticreading"
def __init__(self, engine, config_dict):
try:
ambient_dict=dict(config_dict['StdRESTful']['WOW'])
ambient_dict.setdefault('archive_url', StdWOW.archive_url)
ambient_dict.setdefault('name', 'WOW')
if ambient_dict.has_key('siteid'):
ambient_dict['station'] = ambient_dict['siteid']
if ambient_dict.has_key('siteAuthenticationKey'):
ambient_dict['password'] = ambient_dict['siteAuthenticationKey']
super(StdWOW, self).__init__(engine, config_dict,
format_dict=StdWOW._formats,
**ambient_dict)
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will be posted to WOW")
except (KeyError, ServiceError), e:
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to WOW")
syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e)
#===============================================================================
# Class PostRequest
#===============================================================================
@@ -465,6 +532,13 @@ class PostRequest(threading.Thread):
# in case there's a network problem.
try:
_response = urllib2.urlopen(request)
except urllib2.HTTPError, e:
# WOW signals a bad login with a HTML Error 403 code:
if e.code == 403:
raise BadLogin(e)
else:
syslog.syslog(syslog.LOG_DEBUG, "restx: Failed attempt #%d to upload to %s" % (_count+1, self.name))
syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % (e,))
except (urllib2.URLError, socket.error, httplib.BadStatusLine), e:
# Unsuccessful. Log it and go around again for another try
syslog.syslog(syslog.LOG_DEBUG, "restx: Failed attempt #%d to upload to %s" % (_count+1, self.name))
@@ -501,12 +575,13 @@ class StdCWOP(StdRESTbase):
# First extract the required parameters. If one of them is missing,
# a KeyError exception will occur. Be prepared to catch it.
try:
# Extract the CWOP dictionary:
# Extract a copy of the CWOP dictionary:
cwop_dict=dict(config_dict['StdRESTful']['CWOP'])
cwop_dict.setdefault('name', 'CWOP')
cwop_dict['stale'] = to_int(cwop_dict.get('stale', 1800))
cwop_dict['interval'] = to_int(cwop_dict.get('interval'))
super(StdCWOP, self).__init__(engine, config_dict, **cwop_dict)
super(StdCWOP, self).__init__(engine, config_dict,
**cwop_dict)
# Extract the station and (if necessary) passcode
self.station = cwop_dict['station'].upper()
@@ -550,80 +625,80 @@ class StdCWOP(StdRESTbase):
return self.process_record(event.record)
def get_login_string(self):
login = "user %s pass %s vers weewx %s\r\n" % (self.station, self.passcode, weewx.__version__)
return login
_login = "user %s pass %s vers weewx %s\r\n" % (self.station, self.passcode, weewx.__version__)
return _login
def get_tnc_packet(self, in_record):
"""Form the TNC2 packet used by CWOP."""
# Make sure the record is in US units:
if in_record['usUnits'] == weewx.US:
record = in_record
_record = in_record
else:
record = weewx.units.StdUnitConverters[weewx.US].convertDict(in_record)
record['usUnits'] = weewx.US
_record = weewx.units.StdUnitConverters[weewx.US].convertDict(in_record)
_record['usUnits'] = weewx.US
# Preamble to the TNC packet:
prefix = "%s>APRS,TCPIP*:" % (self.station,)
_prefix = "%s>APRS,TCPIP*:" % (self.station,)
# Time:
time_tt = time.gmtime(record['dateTime'])
time_str = time.strftime("@%d%H%Mz", time_tt)
_time_tt = time.gmtime(_record['dateTime'])
_time_str = time.strftime("@%d%H%Mz", _time_tt)
# Position:
lat_str = weeutil.weeutil.latlon_string(self.latitude, ('N', 'S'), 'lat')
lon_str = weeutil.weeutil.latlon_string(self.longitude, ('E', 'W'), 'lon')
latlon_str = '%s%s%s/%s%s%s' % (lat_str + lon_str)
_lat_str = weeutil.weeutil.latlon_string(self.latitude, ('N', 'S'), 'lat')
_lon_str = weeutil.weeutil.latlon_string(self.longitude, ('E', 'W'), 'lon')
_latlon_str = '%s%s%s/%s%s%s' % (_lat_str + _lon_str)
# Wind and temperature
wt_list = []
for obs_type in ['windDir', 'windSpeed', 'windGust', 'outTemp']:
v = record.get(obs_type)
wt_list.append("%03d" % v if v is not None else '...')
wt_str = "_%s/%sg%st%s" % tuple(wt_list)
_wt_list = []
for _obs_type in ['windDir', 'windSpeed', 'windGust', 'outTemp']:
_v = _record.get(_obs_type)
_wt_list.append("%03d" % _v if _v is not None else '...')
_wt_str = "_%s/%sg%st%s" % tuple(_wt_list)
# Rain
rain_list = []
for obs_type in ['hourRain', 'rain24', 'dayRain']:
v = record.get(obs_type)
rain_list.append("%03d" % (v * 100.0) if v is not None else '...')
rain_str = "r%sp%sP%s" % tuple(rain_list)
_rain_list = []
for _obs_type in ['hourRain', 'rain24', 'dayRain']:
_v = _record.get(_obs_type)
_rain_list.append("%03d" % (_v * 100.0) if _v is not None else '...')
_rain_str = "r%sp%sP%s" % tuple(_rain_list)
# Barometer:
baro = record.get('altimeter')
if baro is None:
baro_str = "b....."
_baro = _record.get('altimeter')
if _baro is None:
_baro_str = "b....."
else:
# While everything else in the CWOP protocol is in US Customary, they
# want the barometer in millibars.
baro_vt = weewx.units.convert((baro, 'inHg', 'group_pressure'), 'mbar')
baro_str = "b%05d" % (baro_vt[0] * 10.0)
_baro_vt = weewx.units.convert((_baro, 'inHg', 'group_pressure'), 'mbar')
_baro_str = "b%05d" % (_baro_vt[0] * 10.0)
# Humidity:
humidity = record.get('outHumidity')
if humidity is None:
humid_str = "h.."
_humidity = _record.get('outHumidity')
if _humidity is None:
_humid_str = "h.."
else:
humid_str = ("h%02d" % humidity) if humidity < 100.0 else "h00"
_humid_str = ("h%02d" % _humidity) if _humidity < 100.0 else "h00"
# Radiation:
radiation = record.get('radiation')
if radiation is None:
radiation_str = ""
elif radiation < 1000.0:
radiation_str = "L%03d" % radiation
elif radiation < 2000.0:
radiation_str = "l%03d" % (radiation - 1000)
_radiation = _record.get('radiation')
if _radiation is None:
_radiation_str = ""
elif _radiation < 1000.0:
_radiation_str = "L%03d" % _radiation
elif _radiation < 2000.0:
_radiation_str = "l%03d" % (_radiation - 1000)
else:
radiation_str = ""
_radiation_str = ""
# Station equipment
equipment_str = ".weewx-%s-%s" % (weewx.__version__, self.hardware)
_equipment_str = ".weewx-%s-%s" % (weewx.__version__, self.hardware)
tnc_packet = prefix + time_str + latlon_str + wt_str + rain_str + \
baro_str + humid_str + radiation_str + equipment_str + "\r\n"
_tnc_packet = _prefix + _time_str + _latlon_str + _wt_str + _rain_str + \
_baro_str + _humid_str + _radiation_str + _equipment_str + "\r\n"
return tnc_packet
return _tnc_packet
def format_protocol(self, record):
# Get the login string
@@ -681,17 +756,17 @@ class PostTNC(threading.Thread):
_time_str = timestamp_to_string(_timestamp)
syslog.syslog(syslog.LOG_INFO, "restx: Published record %s to %s" % (_time_str, self.name))
def send_packet(self, _login, _tnc_packet):
def send_packet(self, login, tnc_packet):
# Get a socket connection:
_sock = self._get_connect()
try:
# Send the login:
self._send(_sock, _login)
self._send(_sock, login)
# And then the packet
self._send(_sock, _tnc_packet)
self._send(_sock, tnc_packet)
finally:
_sock.close()
@@ -699,27 +774,27 @@ class PostTNC(threading.Thread):
# Go through the list of known server:ports, looking for
# a connection that works:
for serv_addr_str in self.server:
server, port = serv_addr_str.split(":")
port = int(port)
for _serv_addr_str in self.server:
_server, _port_str = _serv_addr_str.split(":")
_port = int(_port_str)
for _count in range(self.max_tries):
try:
sock = socket.socket()
sock.connect((server, port))
_sock = socket.socket()
_sock.connect((_server, _port))
except socket.error, e:
# Unsuccessful. Log it and try again
syslog.syslog(syslog.LOG_DEBUG, "restx: Connection attempt #%d failed to %s server %s:%d" % (_count + 1, self.name, server, port))
syslog.syslog(syslog.LOG_DEBUG, "restx: Connection attempt #%d failed to %s server %s:%d" % (_count + 1, self.name, _server, _port))
syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % (e,))
else:
syslog.syslog(syslog.LOG_DEBUG, "restx: Connected to %s server %s:%d" % (self.name, server, port))
return sock
syslog.syslog(syslog.LOG_DEBUG, "restx: Connected to %s server %s:%d" % (self.name, _server, _port))
return _sock
# Couldn't connect on this attempt. Close it, try again.
try:
sock.close()
_sock.close()
except:
pass
# If we got here, that server didn't work. Log it and go on to the next one.
syslog.syslog(syslog.LOG_DEBUG, "restx: Unable to connect to %s server %s:%d" % (self.name, server, port))
syslog.syslog(syslog.LOG_DEBUG, "restx: Unable to connect to %s server %s:%d" % (self.name, _server, _port))
# If we got here. None of the servers worked. Raise an exception
raise IOError, "Unable to obtain a socket connection to %s" % (self.name,)