remove observable - switch because we are already using pubsub elsewhere

This commit is contained in:
Kevin Hester
2024-06-23 08:17:54 -07:00
parent 46edd78f92
commit 7b18fd599c
5 changed files with 23 additions and 59 deletions

View File

@@ -31,6 +31,7 @@ type of packet, you should subscribe to the full topic name. If you want to see
- meshtastic.receive.user(packet)
- meshtastic.receive.data.portnum(packet) (where portnum is an integer or well known PortNum enum)
- meshtastic.node.updated(node = NodeInfo) - published when a node in the DB changes (appears, location changed, username changed, etc...)
- meshtastic.log.line(line) - a raw unparsed log line from the radio
We receive position, user, or data packets from the mesh. You probably only care about meshtastic.receive.data. The first argument for
that publish will be the packet. Text or binary data packets (from sendData or sendText) will both arrive this way. If you print packet

View File

@@ -40,7 +40,6 @@ from meshtastic.util import (
stripnl,
message_to_json,
)
from meshtastic.observable import Observable
class MeshInterface: # pylint: disable=R0902
"""Interface class for meshtastic devices
@@ -71,7 +70,6 @@ class MeshInterface: # pylint: disable=R0902
self.nodes: Optional[Dict[str,Dict]] = None # FIXME
self.isConnected: threading.Event = threading.Event()
self.noProto: bool = noProto
self.onLogMessage = Observable()
self.localNode: meshtastic.node.Node = meshtastic.node.Node(self, -1) # We fixup nodenum later
self.myInfo: Optional[mesh_pb2.MyNodeInfo] = None # We don't have device info yet
self.metadata: Optional[mesh_pb2.DeviceMetadata] = None # We don't have device metadata yet
@@ -93,6 +91,12 @@ class MeshInterface: # pylint: disable=R0902
self.queue: collections.OrderedDict = collections.OrderedDict()
self._localChannels = None
# We could have just not passed in debugOut to MeshInterface, and instead told consumers to subscribe to
# the meshtastic.log.line publish instead. Alas though changing that now would be a breaking API change
# for any external consumers of the library.
if debugOut:
pub.subscribe(MeshInterface._printLogLine, "meshtastic.log.line")
def close(self):
"""Shutdown this interface"""
if self.heartbeatTimer:
@@ -112,9 +116,14 @@ class MeshInterface: # pylint: disable=R0902
logging.error(f"Traceback: {traceback}")
self.close()
def _handleLogLine(self, line):
@staticmethod
def _printLogLine(line, interface):
"""Print a line of log output"""
interface.debugOut.write(line + "\n")
def _handleLogLine(self, line: str) -> None:
"""Handle a line of log output from the device."""
self.onLogMessage.fire(message=line)
pub.sendMessage("meshtastic.log.line", line=line, interface=self)
def showInfo(self, file=sys.stdout) -> str: # pylint: disable=W0613
"""Show human readable summary about this object"""

View File

@@ -1,43 +0,0 @@
"""A basic implementation of the observer pattern."""
import typing
class Event:
"""A simple event class."""
def __init__(self, source) -> None:
self.source = source
def __getattr__(self, name: str) -> typing.Any:
"""We dynamically add attributes to this class, so stub out __getattr__ so that mypy doesn't complain."""
class Observable:
"""A class that represents an observable object.
To publish an event call fire(type="progress", percent=50) or whatever. It will call
"""
def __init__(self):
"""Initialize the Observable object."""
self.callbacks = []
def subscribe(self, callback):
"""Subscribe to the observable object.
Args:
callback (function): The callback function to be called when the event is fired.
"""
self.callbacks.append(callback)
def fire(self, **attrs):
"""Fire the event.
Args:
**attrs: Arbitrary keyword arguments to be passed to the callback functions.
"""
e = Event(self)
for k, v in attrs.items():
setattr(e, k, v)
for fn in self.callbacks:
fn(e)

View File

@@ -8,9 +8,9 @@ from dataclasses import dataclass
import parse
import pandas as pd
from pubsub import pub # type: ignore[import-untyped]
from meshtastic.mesh_interface import MeshInterface
from meshtastic.observable import Event
from meshtastic.powermon import PowerMeter
@@ -20,14 +20,14 @@ class LogDef:
code: str # i.e. PM or B or whatever... see meshtastic slog documentation
format: str # A format string that can be used to parse the arguments
def __init__(self, code: str, format: str) -> None:
def __init__(self, code: str, fmt: str) -> None:
"""Initialize the LogDef object.
code (str): The code.
format (str): The format.
"""
self.code = code
self.format = parse.compile(format)
self.format = parse.compile(fmt)
"""A dictionary mapping from logdef code to logdef"""
log_defs = {d.code: d for d in [
@@ -56,7 +56,7 @@ class StructuredLogger:
self.newData: list[dict] = []
atexit.register(self._exitHandler)
client.onLogMessage.subscribe(self._onLogMessage)
pub.subscribe(self._onLogMessage, "meshtastic.log.line")
def getRawData(self) -> pd.DataFrame:
"""Get the raw data.
@@ -87,12 +87,12 @@ class StructuredLogger:
logging.info(f"Storing slog in {fn}")
self.getRawData().to_csv(fn)
def _onLogMessage(self, ev: Event) -> None:
def _onLogMessage(self, line: str, interface: MeshInterface) -> None: # pylint: disable=unused-argument
"""Handle log messages.
ev (Event): The log event.
line (str): the line of log output
"""
m = log_regex.match(ev.message)
m = log_regex.match(line)
if m:
src = m.group(1)
args = m.group(2)
@@ -110,6 +110,6 @@ class StructuredLogger:
self.newData.append(di)
self.getRawData()
else:
logging.warning(f"Failed to parse slog {ev.message} with {d.format}")
logging.warning(f"Failed to parse slog {line} with {d.format}")
else:
logging.warning(f"Unknown Structured Log: {ev.message}")
logging.warning(f"Unknown Structured Log: {line}")

View File

@@ -142,9 +142,6 @@ class StreamInterface(MeshInterface):
else:
self.cur_log_line += utf
if self.debugOut is not None:
self.debugOut.write(utf)
def __reader(self):
"""The reader thread that reads bytes from our stream"""
logging.debug("in __reader()")