Refactored to allow get_readings() to be used externally.

This commit is contained in:
Tom Keffer
2021-04-23 17:25:43 -07:00
parent 549407a022
commit 4fd4761699

View File

@@ -150,8 +150,8 @@ class Station(object):
DEFAULT_PORT = '/dev/ttyUSB0'
def __init__(self, port, debug_serial=0):
self._debug_serial = debug_serial
self.port = port
self._debug_serial = debug_serial
self.baudrate = 2400
self.timeout = 3 # seconds
self.serial_port = None
@@ -225,53 +225,59 @@ class Station(object):
log.debug("Set station to modem mode")
self.serial_port.write(b">\r")
def get_readings_with_retry(self, max_tries=5, retry_wait=3):
def get_readings_with_retry(self, max_tries, retry_wait):
for ntries in range(max_tries):
try:
buf = self.get_readings()
buf = get_readings(self.serial_port, self._debug_serial)
validate_string(buf)
return buf
except (serial.SerialException, weewx.WeeWxIOError) as e:
log.info("Failed attempt %d of %d to get readings: %s", ntries + 1, max_tries, e)
log.info("Failed attempt %d of %d to get readings: %s",
ntries + 1, max_tries, e)
time.sleep(retry_wait)
else:
msg = "Max retries (%d) exceeded for readings" % max_tries
log.error(msg)
raise weewx.RetriesExceeded(msg)
def get_readings(self):
"""Read an Ultimeter sentence.
Returns:
bytearray: A bytearray containing the sentence.
"""
# Search for the character '!', which marks the beginning of a "sentence":
while True:
c = self.serial_port.read(1)
if c == b'!':
break
# Save the first '!' ...
buf = bytearray(c)
# ... then read until we get to a '\r' or '\n'
while True:
c = self.serial_port.read(1)
if c == b'\n' or c == b'\r':
# We found a carriage return or newline, so we have the complete sentence.
# NB: Because the Ultimeter terminates a sentence with a '\r\n', this will
# leave a newline in the buffer. We don't care: it will get skipped over when
# we search for the next sentence.
break
buf += c
if self._debug_serial:
log.debug("Station said: %s", _fmt(buf))
return buf
# ##############################################################33
# Decode utilities
# Utilities
# ##############################################################33
def get_readings(serial_port, debug_serial):
"""Read an Ultimeter sentence from a serial port.
Args:
serial_port (serial.Serial): An open port
debug_serial (int): Set to greater than zero for extra debug information.
Returns:
bytearray: A bytearray containing the sentence.
"""
# Search for the character '!', which marks the beginning of a "sentence":
while True:
c = serial_port.read(1)
if c == b'!':
break
# Save the first '!' ...
buf = bytearray(c)
# ... then read until we get to a '\r' or '\n'
while True:
c = serial_port.read(1)
if c == b'\n' or c == b'\r':
# We found a carriage return or newline, so we have the complete sentence.
# NB: Because the Ultimeter terminates a sentence with a '\r\n', this will
# leave a newline in the buffer. We don't care: it will get skipped over when
# we search for the next sentence.
break
buf += c
if debug_serial:
log.debug("Station said: %s", _fmt(buf))
return buf
def validate_string(buf, choices=None):
"""Validate a data buffer.