Module meshtastic.util

Expand source code
from collections import defaultdict
import serial
import serial.tools.list_ports
from queue import Queue
import threading, sys, logging

"""Some devices such as a seger jlink we never want to accidentally open"""
blacklistVids = dict.fromkeys([0x1366])


def stripnl(s):
    """remove newlines from a string (and remove extra whitespace)"""
    s = str(s).replace("\n", " ")
    return ' '.join(s.split())


def fixme(message):
    raise Exception(f"FIXME: {message}")


def catchAndIgnore(reason, closure):
    """Call a closure but if it throws an excpetion print it and continue"""
    try:
        closure()
    except BaseException as ex:
        logging.error(f"Exception thrown in {reason}: {ex}")


def findPorts():
    """Find all ports that might have meshtastic devices

    Returns:
        list -- a list of device paths
    """
    l = list(map(lambda port: port.device,
                 filter(lambda port: port.vid != None and port.vid not in blacklistVids,
                        serial.tools.list_ports.comports())))
    l.sort()
    return l


class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__


class DeferredExecution():
    """A thread that accepts closures to run, and runs them as they are received"""

    def __init__(self, name=None):
        self.queue = Queue()
        self.thread = threading.Thread(target=self._run, args=(), name=name)
        self.thread.daemon = True
        self.thread.start()

    def queueWork(self, runnable):
        self.queue.put(runnable)

    def _run(self):
        while True:
            try:
                o = self.queue.get()
                o()
            except:
                logging.error(
                    f"Unexpected error in deferred execution {sys.exc_info()[0]}")

Functions

def catchAndIgnore(reason, closure)

Call a closure but if it throws an excpetion print it and continue

Expand source code
def catchAndIgnore(reason, closure):
    """Call a closure but if it throws an excpetion print it and continue"""
    try:
        closure()
    except BaseException as ex:
        logging.error(f"Exception thrown in {reason}: {ex}")
def findPorts()

Find all ports that might have meshtastic devices

Returns

list – a list of device paths

Expand source code
def findPorts():
    """Find all ports that might have meshtastic devices

    Returns:
        list -- a list of device paths
    """
    l = list(map(lambda port: port.device,
                 filter(lambda port: port.vid != None and port.vid not in blacklistVids,
                        serial.tools.list_ports.comports())))
    l.sort()
    return l
def fixme(message)
Expand source code
def fixme(message):
    raise Exception(f"FIXME: {message}")
def stripnl(s)

remove newlines from a string (and remove extra whitespace)

Expand source code
def stripnl(s):
    """remove newlines from a string (and remove extra whitespace)"""
    s = str(s).replace("\n", " ")
    return ' '.join(s.split())

Classes

class DeferredExecution (name=None)

A thread that accepts closures to run, and runs them as they are received

Expand source code
class DeferredExecution():
    """A thread that accepts closures to run, and runs them as they are received"""

    def __init__(self, name=None):
        self.queue = Queue()
        self.thread = threading.Thread(target=self._run, args=(), name=name)
        self.thread.daemon = True
        self.thread.start()

    def queueWork(self, runnable):
        self.queue.put(runnable)

    def _run(self):
        while True:
            try:
                o = self.queue.get()
                o()
            except:
                logging.error(
                    f"Unexpected error in deferred execution {sys.exc_info()[0]}")

Methods

def queueWork(self, runnable)
Expand source code
def queueWork(self, runnable):
    self.queue.put(runnable)
class dotdict (*args, **kwargs)

dot.notation access to dictionary attributes

Expand source code
class dotdict(dict):
    """dot.notation access to dictionary attributes"""
    __getattr__ = dict.get
    __setattr__ = dict.__setitem__
    __delattr__ = dict.__delitem__

Ancestors

  • builtins.dict