Rapidfire now works.

This commit is contained in:
Tom Keffer
2013-11-03 00:56:57 +00:00
parent 986c0113a5
commit 380727c961

View File

@@ -12,13 +12,12 @@ import httplib
import socket
import syslog
import threading
import time
import urllib
import urllib2
import weewx.wxengine
import weeutil.weeutil
import weewx.archive
import weewx.wxengine
class ServiceError(Exception):
"""Raised when not enough info is available to start a service."""
@@ -28,12 +27,15 @@ class SkippedPost(Exception):
"""Raised when a post is skipped."""
class BadLogin(StandardError):
"""Raised when login information is bad or missing."""
class TimeToExit(Exception):
"""Raised when it's time for a threaded queue to exit."""
#===============================================================================
# Class StdRESTbase
#===============================================================================
class StdRESTbase(weewx.wxengine.StdService):
"""Base class for RESTful weewx services."""
def __init__(self, engine, config_dict):
super(StdRESTbase, self).__init__(engine, config_dict)
@@ -101,6 +103,7 @@ class StdRESTbase(weewx.wxengine.StdService):
#===============================================================================
class Ambient(StdRESTbase):
"""Base class for weather sites that use the Ambient protocol."""
# Types and formats of the data to be published:
_formats = {'dateTime' : ('dateutc', '%s'),
@@ -120,7 +123,7 @@ class Ambient(StdRESTbase):
super(Ambient, self).__init__(engine, ambient_dict)
try:
self.station = ambient_dict['station']
self.station = ambient_dict['station']
self.password = ambient_dict['password']
do_rapidfire_post = weeutil.weeutil.tobool(ambient_dict.get('rapidfire', False))
@@ -129,15 +132,19 @@ class Ambient(StdRESTbase):
self.rapidfire_url = ambient_dict['rapidfire_url']
self.init_loop_queue()
self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet)
self.loop_thread = PostRequest(self.loop_queue, name="Wunderground-Rapidfire")
max_backlog = int(ambient_dict.get('rapidfire_max_backlog', 0))
self.loop_thread = PostRequest(self.loop_queue,
name="Wunderground-Rapidfire",
max_backlog=max_backlog)
self.loop_thread.start()
if do_archive_post:
self.archive_url = ambient_dict['archive_url']
self.init_archive_queue()
self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record)
self.archive_thread = PostRequest(self.archive_queue, name="Wunderground")
self.archive_thread = PostRequest(self.archive_queue,
name="Wunderground")
self.archive_thread.start()
if do_rapidfire_post or do_archive_post:
self.archive = weewx.archive.Archive.open(ambient_dict['archive_db_dict'])
@@ -152,7 +159,7 @@ class Ambient(StdRESTbase):
_post_dict['rtfreq'] = '2.5'
_url = self.rapidfire_url + '?' + weeutil.weeutil.urlencode(_post_dict)
_request = urllib2.Request(_url)
self.loop_queue.put(_request)
self.loop_queue.put((_record['dateTime'], _request))
pass
def new_archive_record(self, event):
@@ -160,10 +167,15 @@ class Ambient(StdRESTbase):
_post_dict = self.extract_from(_record)
_url = self.archive_url + '?' + weeutil.weeutil.urlencode(_post_dict)
_request = urllib2.Request(_url)
self.archive_queue.put(_request)
self.loop_queue.put((_record['dateTime'], _request))
pass
def extract_from(self, record):
"""Given a record, format it using the Ambient protocol.
Returns:
A dictionary where the keys are the Ambient keywords, and the values
are strings formatted according to the Ambient protocol."""
if record['usUnits'] != weewx.US:
raise weewx.ViolatedPrecondition("Unit system (%d) must be in US Customary" % record['usUnits'])
@@ -199,7 +211,9 @@ class Ambient(StdRESTbase):
#===============================================================================
class Wunderground(Ambient):
"""Specialized version of the Ambient protocol for the Weather Underground."""
# The URLs used by the WU:
rapidfire_url = "http://rtupdate.wunderground.com/weatherstation/updateweatherstation.php"
archive_url = "http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php"
@@ -216,14 +230,12 @@ class Wunderground(Ambient):
syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to Wunderground")
syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e)
#===============================================================================
# Class PostURL
# Class PostRequest
#===============================================================================
class PostRequest(threading.Thread):
"""Post something using a RESTful HTTP GET or POST"""
"""Post an urllib2 "Request" object"""
def __init__(self, queue, **kwargs):
@@ -232,37 +244,62 @@ class PostRequest(threading.Thread):
self.queue = queue
self.max_tries = int(kwargs.get('max_tries', 3))
self.max_backlog = kwargs.get('max_backlog')
if self.max_backlog is not None:
self.max_backlog = int(self.max_backlog)
self.setDaemon(True)
def run(self):
while True:
# This will block until a request shows up.
_request = self.queue.get()
# If a "None" value appears in the pipe, it's our signal to exit:
if _request is None:
return
try:
while True:
_request_tuple = self.fetch_from_queue()
# If packets have backed up in the queue, throw them all away except the last one.
if self.max_backlog is not None:
_request_tuple = self.fetch_from_queue()
# Unpack the timestamp and Request object
_timestamp, _request = _request_tuple
try:
# Now post it
self.post_request(_request)
except FailedPost:
syslog.syslog(syslog.LOG_ERR, "restx: Failed to upload to '%s'" % self.name)
except BadLogin, e:
syslog.syslog(syslog.LOG_CRIT, "restx: Failed to post to '%s'" % self.name)
syslog.syslog(syslog.LOG_CRIT, " **** Reason: %s" % e)
syslog.syslog(syslog.LOG_CRIT, " **** Terminating %s thread" % self.name)
return
else:
_time_str = weeutil.weeutil.timestamp_to_string(_timestamp)
syslog.syslog(syslog.LOG_INFO, "restx: Published record %s to %s" % (_time_str, self.name))
except TimeToExit:
syslog.syslog(syslog.LOG_DEBUG, "restx: Thread %s received request to exit." % self.name)
try:
# Now post it
self.post_request(_request)
except FailedPost:
syslog.syslog(syslog.LOG_ERR, "restx: Failed to upload to '%s'" % self.name)
except BadLogin, e:
syslog.syslog(syslog.LOG_CRIT, "restx: Failed to post to '%s'" % self.name)
syslog.syslog(syslog.LOG_CRIT, " **** Reason: %s" % e)
syslog.syslog(syslog.LOG_CRIT, " **** Terminating %s thread" % self.name)
return
def fetch_from_queue(self):
# This will block until a request shows up.
_request_tuple = self.queue.get()
# If a "None" value appears in the pipe, it's our signal to exit:
if _request_tuple is None:
raise TimeToExit
return _request_tuple
def post_request(self, request):
"""Post a request.
request: An instance of urllib2.Request
"""
# Retry up to max_tries times:
for _count in range(self.max_tries):
# Now use urllib2 to post the data. Wrap in a try block
# in case there's a network problem.
try:
print request._Request__original
_response = urllib2.urlopen(request)
except (urllib2.URLError, socket.error, httplib.BadStatusLine), e:
# Unsuccessful. Log it and go around again for another try
@@ -283,5 +320,3 @@ class PostRequest(threading.Thread):
# This is executed only if the loop terminates normally, meaning
# the upload failed max_tries times. Log it.
raise FailedPost("Failed upload to site %s after %d tries" % (self.name, self.max_tries))