diff --git a/bin/weewx/restx.py b/bin/weewx/restx.py index 06c5f6ec..521b4ad1 100644 --- a/bin/weewx/restx.py +++ b/bin/weewx/restx.py @@ -12,13 +12,12 @@ import httplib import socket import syslog import threading -import time import urllib import urllib2 -import weewx.wxengine import weeutil.weeutil import weewx.archive +import weewx.wxengine class ServiceError(Exception): """Raised when not enough info is available to start a service.""" @@ -28,12 +27,15 @@ class SkippedPost(Exception): """Raised when a post is skipped.""" class BadLogin(StandardError): """Raised when login information is bad or missing.""" - +class TimeToExit(Exception): + """Raised when it's time for a threaded queue to exit.""" + #=============================================================================== # Class StdRESTbase #=============================================================================== class StdRESTbase(weewx.wxengine.StdService): + """Base class for RESTful weewx services.""" def __init__(self, engine, config_dict): super(StdRESTbase, self).__init__(engine, config_dict) @@ -101,6 +103,7 @@ class StdRESTbase(weewx.wxengine.StdService): #=============================================================================== class Ambient(StdRESTbase): + """Base class for weather sites that use the Ambient protocol.""" # Types and formats of the data to be published: _formats = {'dateTime' : ('dateutc', '%s'), @@ -120,7 +123,7 @@ class Ambient(StdRESTbase): super(Ambient, self).__init__(engine, ambient_dict) try: - self.station = ambient_dict['station'] + self.station = ambient_dict['station'] self.password = ambient_dict['password'] do_rapidfire_post = weeutil.weeutil.tobool(ambient_dict.get('rapidfire', False)) @@ -129,15 +132,19 @@ class Ambient(StdRESTbase): self.rapidfire_url = ambient_dict['rapidfire_url'] self.init_loop_queue() self.bind(weewx.NEW_LOOP_PACKET, self.new_loop_packet) - self.loop_thread = PostRequest(self.loop_queue, name="Wunderground-Rapidfire") + max_backlog = int(ambient_dict.get('rapidfire_max_backlog', 0)) + self.loop_thread = PostRequest(self.loop_queue, + name="Wunderground-Rapidfire", + max_backlog=max_backlog) self.loop_thread.start() if do_archive_post: self.archive_url = ambient_dict['archive_url'] self.init_archive_queue() self.bind(weewx.NEW_ARCHIVE_RECORD, self.new_archive_record) - self.archive_thread = PostRequest(self.archive_queue, name="Wunderground") + self.archive_thread = PostRequest(self.archive_queue, + name="Wunderground") self.archive_thread.start() - + if do_rapidfire_post or do_archive_post: self.archive = weewx.archive.Archive.open(ambient_dict['archive_db_dict']) @@ -152,7 +159,7 @@ class Ambient(StdRESTbase): _post_dict['rtfreq'] = '2.5' _url = self.rapidfire_url + '?' + weeutil.weeutil.urlencode(_post_dict) _request = urllib2.Request(_url) - self.loop_queue.put(_request) + self.loop_queue.put((_record['dateTime'], _request)) pass def new_archive_record(self, event): @@ -160,10 +167,15 @@ class Ambient(StdRESTbase): _post_dict = self.extract_from(_record) _url = self.archive_url + '?' + weeutil.weeutil.urlencode(_post_dict) _request = urllib2.Request(_url) - self.archive_queue.put(_request) + self.loop_queue.put((_record['dateTime'], _request)) pass def extract_from(self, record): + """Given a record, format it using the Ambient protocol. + + Returns: + A dictionary where the keys are the Ambient keywords, and the values + are strings formatted according to the Ambient protocol.""" if record['usUnits'] != weewx.US: raise weewx.ViolatedPrecondition("Unit system (%d) must be in US Customary" % record['usUnits']) @@ -199,7 +211,9 @@ class Ambient(StdRESTbase): #=============================================================================== class Wunderground(Ambient): + """Specialized version of the Ambient protocol for the Weather Underground.""" + # The URLs used by the WU: rapidfire_url = "http://rtupdate.wunderground.com/weatherstation/updateweatherstation.php" archive_url = "http://weatherstation.wunderground.com/weatherstation/updateweatherstation.php" @@ -216,14 +230,12 @@ class Wunderground(Ambient): syslog.syslog(syslog.LOG_DEBUG, "restx: Data will not be posted to Wunderground") syslog.syslog(syslog.LOG_DEBUG, " **** Reason: %s" % e) - - #=============================================================================== -# Class PostURL +# Class PostRequest #=============================================================================== class PostRequest(threading.Thread): - """Post something using a RESTful HTTP GET or POST""" + """Post an urllib2 "Request" object""" def __init__(self, queue, **kwargs): @@ -232,37 +244,62 @@ class PostRequest(threading.Thread): self.queue = queue self.max_tries = int(kwargs.get('max_tries', 3)) + self.max_backlog = kwargs.get('max_backlog') + if self.max_backlog is not None: + self.max_backlog = int(self.max_backlog) self.setDaemon(True) def run(self): - while True: - # This will block until a request shows up. - _request = self.queue.get() - # If a "None" value appears in the pipe, it's our signal to exit: - if _request is None: - return + try: + while True: + _request_tuple = self.fetch_from_queue() + + # If packets have backed up in the queue, throw them all away except the last one. + if self.max_backlog is not None: + _request_tuple = self.fetch_from_queue() + + # Unpack the timestamp and Request object + _timestamp, _request = _request_tuple + + try: + # Now post it + self.post_request(_request) + except FailedPost: + syslog.syslog(syslog.LOG_ERR, "restx: Failed to upload to '%s'" % self.name) + except BadLogin, e: + syslog.syslog(syslog.LOG_CRIT, "restx: Failed to post to '%s'" % self.name) + syslog.syslog(syslog.LOG_CRIT, " **** Reason: %s" % e) + syslog.syslog(syslog.LOG_CRIT, " **** Terminating %s thread" % self.name) + return + else: + _time_str = weeutil.weeutil.timestamp_to_string(_timestamp) + syslog.syslog(syslog.LOG_INFO, "restx: Published record %s to %s" % (_time_str, self.name)) + except TimeToExit: + syslog.syslog(syslog.LOG_DEBUG, "restx: Thread %s received request to exit." % self.name) - try: - # Now post it - self.post_request(_request) - except FailedPost: - syslog.syslog(syslog.LOG_ERR, "restx: Failed to upload to '%s'" % self.name) - except BadLogin, e: - syslog.syslog(syslog.LOG_CRIT, "restx: Failed to post to '%s'" % self.name) - syslog.syslog(syslog.LOG_CRIT, " **** Reason: %s" % e) - syslog.syslog(syslog.LOG_CRIT, " **** Terminating %s thread" % self.name) - return + def fetch_from_queue(self): + + # This will block until a request shows up. + _request_tuple = self.queue.get() + # If a "None" value appears in the pipe, it's our signal to exit: + if _request_tuple is None: + raise TimeToExit + return _request_tuple + def post_request(self, request): + """Post a request. + + request: An instance of urllib2.Request + """ # Retry up to max_tries times: for _count in range(self.max_tries): # Now use urllib2 to post the data. Wrap in a try block # in case there's a network problem. try: - print request._Request__original _response = urllib2.urlopen(request) except (urllib2.URLError, socket.error, httplib.BadStatusLine), e: # Unsuccessful. Log it and go around again for another try @@ -283,5 +320,3 @@ class PostRequest(threading.Thread): # This is executed only if the loop terminates normally, meaning # the upload failed max_tries times. Log it. raise FailedPost("Failed upload to site %s after %d tries" % (self.name, self.max_tries)) - -