Module meshtastic.util

Expand source code
from collections import defaultdict
import serial, traceback
import serial.tools.list_ports
from queue import Queue
import threading, sys, time, logging

"""Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366])


def stripnl(s):
    """remove newlines from a string (and remove extra whitespace)"""
    s = str(s).replace("\n", " ")
    return ' '.join(s.split())


def fixme(message):
    raise Exception(f"FIXME: {message}")


def catchAndIgnore(reason, closure):
    """Call a closure but if it throws an excpetion print it and continue"""
    try:
        closure()
    except BaseException as ex:
        logging.error(f"Exception thrown in {reason}: {ex}")


def findPorts():
    """Find all ports that might have meshtastic devices

    Returns:
        list -- a list of device paths
    """
    l = list(map(lambda port: port.device,
                 filter(lambda port: port.vid != None and port.vid not in blacklistVids,
                        serial.tools.list_ports.comports())))
    l.sort()
    return l


class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__


class Timeout:
    def __init__(self, maxSecs=20):
        self.expireTime = 0
        self.sleepInterval = 0.1
        self.expireTimeout = maxSecs

    def reset(self):
        """Restart the waitForSet timer"""
        self.expireTime = time.time() + self.expireTimeout

    def waitForSet(self, target, attrs=()):
        """Block until the specified attributes are set. Returns True if config has been received."""
        self.reset()
        while time.time() < self.expireTime:
            if all(map(lambda a: getattr(target, a, None), attrs)):
                return True
            time.sleep(self.sleepInterval)
        return False


class DeferredExecution():
    """A thread that accepts closures to run, and runs them as they are received"""

    def __init__(self, name=None):
        self.queue = Queue()
        self.thread = threading.Thread(target=self._run, args=(), name=name)
        self.thread.daemon = True
        self.thread.start()

    def queueWork(self, runnable):
        self.queue.put(runnable)

    def _run(self):
        while True:
            try:
                o = self.queue.get()
                o()
            except:
                logging.error(
                    f"Unexpected error in deferred execution {sys.exc_info()[0]}")
                print(traceback.format_exc())

Functions

def catchAndIgnore(reason, closure)

Call a closure but if it throws an excpetion print it and continue

Expand source code
def catchAndIgnore(reason, closure):
    """Call a closure but if it throws an excpetion print it and continue"""
    try:
        closure()
    except BaseException as ex:
        logging.error(f"Exception thrown in {reason}: {ex}")
def findPorts()

Find all ports that might have meshtastic devices

Returns

list – a list of device paths

Expand source code
def findPorts():
    """Find all ports that might have meshtastic devices

    Returns:
        list -- a list of device paths
    """
    l = list(map(lambda port: port.device,
                 filter(lambda port: port.vid != None and port.vid not in blacklistVids,
                        serial.tools.list_ports.comports())))
    l.sort()
    return l
def fixme(message)
Expand source code
def fixme(message):
    raise Exception(f"FIXME: {message}")
def stripnl(s)

remove newlines from a string (and remove extra whitespace)

Expand source code
def stripnl(s):
    """remove newlines from a string (and remove extra whitespace)"""
    s = str(s).replace("\n", " ")
    return ' '.join(s.split())

Classes

class DeferredExecution (name=None)

A thread that accepts closures to run, and runs them as they are received

Expand source code
class DeferredExecution():
    """A thread that accepts closures to run, and runs them as they are received"""

    def __init__(self, name=None):
        self.queue = Queue()
        self.thread = threading.Thread(target=self._run, args=(), name=name)
        self.thread.daemon = True
        self.thread.start()

    def queueWork(self, runnable):
        self.queue.put(runnable)

    def _run(self):
        while True:
            try:
                o = self.queue.get()
                o()
            except:
                logging.error(
                    f"Unexpected error in deferred execution {sys.exc_info()[0]}")
                print(traceback.format_exc())

Methods

def queueWork(self, runnable)
Expand source code
def queueWork(self, runnable):
    self.queue.put(runnable)
class Timeout (maxSecs=20)
Expand source code
class Timeout:
    def __init__(self, maxSecs=20):
        self.expireTime = 0
        self.sleepInterval = 0.1
        self.expireTimeout = maxSecs

    def reset(self):
        """Restart the waitForSet timer"""
        self.expireTime = time.time() + self.expireTimeout

    def waitForSet(self, target, attrs=()):
        """Block until the specified attributes are set. Returns True if config has been received."""
        self.reset()
        while time.time() < self.expireTime:
            if all(map(lambda a: getattr(target, a, None), attrs)):
                return True
            time.sleep(self.sleepInterval)
        return False

Methods

def reset(self)

Restart the waitForSet timer

Expand source code
def reset(self):
    """Restart the waitForSet timer"""
    self.expireTime = time.time() + self.expireTimeout
def waitForSet(self, target, attrs=())

Block until the specified attributes are set. Returns True if config has been received.

Expand source code
def waitForSet(self, target, attrs=()):
    """Block until the specified attributes are set. Returns True if config has been received."""
    self.reset()
    while time.time() < self.expireTime:
        if all(map(lambda a: getattr(target, a, None), attrs)):
            return True
        time.sleep(self.sleepInterval)
    return False
class dotdict (*args, **kwargs)

dot.notation access to dictionary attributes

Expand source code
class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__

Ancestors

  • builtins.dict