mirror of
https://github.com/meshtastic/python.git
synced 2026-06-02 12:45:00 -04:00
Merge pull request #928 from ianmcorvidae/examples
Make examples more regularized and focused, and add contribution guidelines for the examples folder
This commit is contained in:
70
examples/CONTRIBUTING.md
Normal file
70
examples/CONTRIBUTING.md
Normal file
@@ -0,0 +1,70 @@
|
||||
# Contributing Example Scripts
|
||||
|
||||
Use this guide when adding or updating scripts in `examples/`.
|
||||
|
||||
## Must-have checklist before opening a PR
|
||||
|
||||
1. Script teaches one clear thing (its primary learning goal).
|
||||
2. File name matches that goal.
|
||||
3. Top docstring states purpose, transport scope, behavior, and expected output.
|
||||
4. Script has safe shutdown (`with ...` or `finally`) and graceful `KeyboardInterrupt` handling.
|
||||
5. Argument handling is clear (`argparse` preferred).
|
||||
6. Errors are explicit (no bare `except:`).
|
||||
7. The script is not a near-duplicate of an existing example.
|
||||
|
||||
## Choose the right teaching goal
|
||||
|
||||
Each example should have one primary lesson. Keep it focused.
|
||||
|
||||
- Good: "Send one text message over serial."
|
||||
- Good: "Print inbound text messages."
|
||||
- Avoid: discovery + chat + config mutation all in one script unless that combined flow is the lesson.
|
||||
|
||||
## Transport scope must be explicit
|
||||
|
||||
State exactly what transports are supported and why.
|
||||
|
||||
- Serial-only when that keeps the example simplest.
|
||||
- Multi-transport (Serial/TCP/BLE) only when transport selection is part of the lesson.
|
||||
- If TCP/BLE are supported, expose explicit flags (`--host`, `--ble`) and document defaults.
|
||||
|
||||
## Behavior and output should be predictable
|
||||
|
||||
Readers should know if the script sends, receives, mutates config, or combines those.
|
||||
|
||||
- Receive examples: subscribe to the narrowest pubsub topic that matches the lesson.
|
||||
- Send examples: clarify destination behavior (broadcast default vs explicit destination).
|
||||
- Mutation examples: clearly document side effects.
|
||||
|
||||
Output should make success easy to confirm:
|
||||
|
||||
- Print concise, stable status/event lines.
|
||||
- Avoid noisy debug output unless the script is specifically diagnostic-focused.
|
||||
|
||||
## Cleanup and error handling
|
||||
|
||||
- Use context managers where practical; otherwise close interfaces in `finally`.
|
||||
- Handle `KeyboardInterrupt` cleanly.
|
||||
- Exit non-zero for invalid args, connection/setup failures, or command failures.
|
||||
|
||||
## Naming guidance
|
||||
|
||||
Use descriptive names tied to the teaching goal.
|
||||
|
||||
- Prefer names like `tcp_connection_info_once.py` over `pub_sub_example.py`.
|
||||
- Prefer names like `tcp_pubsub_send_and_receive.py` over `pub_sub_example2.py`.
|
||||
- Avoid generic names such as `example2.py`.
|
||||
|
||||
Keep existing filenames only when compatibility or discoverability outweighs clarity.
|
||||
|
||||
## New script vs extending an existing one
|
||||
|
||||
Create a new script when:
|
||||
|
||||
- The learning goal is genuinely distinct.
|
||||
- Combining behaviors would make either example harder to understand.
|
||||
|
||||
Extend an existing script when:
|
||||
|
||||
- The change deepens the same lesson.
|
||||
- The resulting script remains focused and readable.
|
||||
@@ -1,21 +1,42 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/get_hw.py
|
||||
"""Print the local node hardware model.
|
||||
|
||||
Purpose: show the narrowest read-only local hardware lookup.
|
||||
Transport scope: Serial only.
|
||||
Behavior: reads local node metadata and prints hwModel.
|
||||
Expected output: one hardware model line, if available.
|
||||
Cleanup/error handling: exits with code 3 for bad args and closes interface on exit.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) != 1:
|
||||
print(f"usage: {sys.argv[0]}")
|
||||
print("Print the hardware model for the local node.")
|
||||
sys.exit(3)
|
||||
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
if iface.nodes:
|
||||
for n in iface.nodes.values():
|
||||
if n["num"] == iface.myInfo.my_node_num:
|
||||
print(n["user"]["hwModel"])
|
||||
iface.close()
|
||||
def main() -> int:
|
||||
"""Print the hardware model for the local node."""
|
||||
if len(sys.argv) != 1:
|
||||
print(f"usage: {sys.argv[0]}")
|
||||
print("Print the hardware model for the local node.")
|
||||
return 3
|
||||
|
||||
parser = argparse.ArgumentParser(description="Print local Meshtastic hardware model")
|
||||
parser.parse_args()
|
||||
|
||||
try:
|
||||
with meshtastic.serial_interface.SerialInterface() as iface:
|
||||
if iface.nodes:
|
||||
for node in iface.nodes.values():
|
||||
if node["num"] == iface.myInfo.my_node_num:
|
||||
print(node["user"]["hwModel"])
|
||||
break
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not read hardware model: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
@@ -1,19 +1,38 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/hello_world_serial.py
|
||||
"""Send one text message over serial.
|
||||
|
||||
Purpose: minimal send-only example.
|
||||
Transport scope: Serial only.
|
||||
Behavior: sends one message and exits.
|
||||
Expected output: no output on success.
|
||||
Cleanup/error handling: exits with code 3 for bad args, closes interface on exit.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} message")
|
||||
sys.exit(3)
|
||||
|
||||
# By default will try to find a meshtastic device,
|
||||
# otherwise provide a device path like /dev/ttyUSB0
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
iface.sendText(sys.argv[1])
|
||||
iface.close()
|
||||
def main() -> int:
|
||||
"""Parse arguments and send one text message."""
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} message")
|
||||
return 3
|
||||
|
||||
parser = argparse.ArgumentParser(description="Send one Meshtastic text message over serial")
|
||||
parser.add_argument("message", help="Message text to broadcast")
|
||||
args = parser.parse_args()
|
||||
|
||||
try:
|
||||
with meshtastic.serial_interface.SerialInterface() as iface:
|
||||
iface.sendText(args.message)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not send message: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
@@ -1,20 +1,46 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/info.py
|
||||
"""Show a concise local node summary over serial.
|
||||
|
||||
Purpose: read local node identity and metadata in one place.
|
||||
Transport scope: Serial only.
|
||||
Behavior: reads node database, prints local node ID/name/hardware model.
|
||||
Expected output: 1-3 summary lines describing the local node.
|
||||
Cleanup/error handling: closes interface on exit and prints clear errors on failure.
|
||||
"""
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
|
||||
# call showInfo() just to ensure values are populated
|
||||
# info = iface.showInfo()
|
||||
def main() -> int:
|
||||
"""Print local node summary fields."""
|
||||
try:
|
||||
with meshtastic.serial_interface.SerialInterface() as iface:
|
||||
local_num = iface.myInfo.my_node_num
|
||||
local_node = None
|
||||
if iface.nodes:
|
||||
for node in iface.nodes.values():
|
||||
if node["num"] == local_num:
|
||||
local_node = node
|
||||
break
|
||||
|
||||
if not local_node:
|
||||
print(f"Local node not found in node database (node num: {local_num}).")
|
||||
return 1
|
||||
|
||||
user = local_node.get("user", {})
|
||||
print(f"Node number: {local_num}")
|
||||
print(f"Node ID: {local_node.get('id', 'unknown')}")
|
||||
print(
|
||||
"Name: "
|
||||
f"{user.get('longName', 'unknown')} ({user.get('shortName', 'unknown')})"
|
||||
)
|
||||
print(f"Hardware model: {user.get('hwModel', 'unknown')}")
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not read local node summary: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if iface.nodes:
|
||||
for n in iface.nodes.values():
|
||||
if n["num"] == iface.myInfo.my_node_num:
|
||||
print(n["user"]["hwModel"])
|
||||
break
|
||||
|
||||
iface.close()
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
64
examples/meshtastic_serial_message_reader.py
Normal file
64
examples/meshtastic_serial_message_reader.py
Normal file
@@ -0,0 +1,64 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Passively monitor incoming text messages over serial.
|
||||
|
||||
Purpose: receive-only monitor for text messages.
|
||||
Transport scope: Serial only.
|
||||
Behavior: subscribes to text receive events and prints timestamp/channel/sender/text.
|
||||
Expected output: one line per received text message.
|
||||
Cleanup/error handling: graceful Ctrl+C exit and explicit connection errors.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import time
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional
|
||||
|
||||
from pubsub import pub
|
||||
import meshtastic.serial_interface
|
||||
|
||||
_TZ_NAME = time.tzname[time.localtime().tm_isdst > 0]
|
||||
|
||||
|
||||
def on_receive(packet: dict[str, Any], interface: Any) -> None: # pylint: disable=unused-argument
|
||||
"""Print a compact line for each received text packet."""
|
||||
decoded = packet.get("decoded", {})
|
||||
if decoded.get("portnum") != "TEXT_MESSAGE_APP":
|
||||
return
|
||||
|
||||
message = decoded.get("text")
|
||||
if not message:
|
||||
return
|
||||
|
||||
channel_num = packet.get("channel", 0)
|
||||
sender_id = packet.get("fromId", "unknown")
|
||||
message_time = datetime.now().strftime(f"%a %b %d %Y %H:%M:%S {_TZ_NAME}")
|
||||
print(f"{message_time} : {channel_num} : {sender_id} : {message}")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Connect over serial and print inbound text messages."""
|
||||
parser = argparse.ArgumentParser(description="Read incoming Meshtastic text over serial")
|
||||
parser.add_argument("--port", default=None, help="Serial port path (default: auto-detect)")
|
||||
args = parser.parse_args()
|
||||
|
||||
pub.subscribe(on_receive, "meshtastic.receive")
|
||||
|
||||
iface: Optional[meshtastic.serial_interface.SerialInterface] = None
|
||||
try:
|
||||
iface = meshtastic.serial_interface.SerialInterface(devPath=args.port)
|
||||
print("Connected. Listening for text messages. Press Ctrl+C to exit.")
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not monitor serial messages: {exc}")
|
||||
return 1
|
||||
finally:
|
||||
if iface:
|
||||
iface.close()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,30 +0,0 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/pub_sub_example.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
|
||||
from pubsub import pub
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.tcp_interface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} host")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument
|
||||
"""This is called when we (re)connect to the radio."""
|
||||
print(interface.myInfo)
|
||||
interface.close()
|
||||
|
||||
|
||||
pub.subscribe(onConnection, "meshtastic.connection.established")
|
||||
|
||||
try:
|
||||
iface = meshtastic.tcp_interface.TCPInterface(sys.argv[1])
|
||||
except:
|
||||
print(f"Error: Could not connect to {sys.argv[1]}")
|
||||
sys.exit(1)
|
||||
@@ -1,42 +0,0 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/pub_sub_example2.py
|
||||
"""
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
from pubsub import pub
|
||||
|
||||
from meshtastic.tcp_interface import TCPInterface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} host")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def onReceive(packet, interface): # pylint: disable=unused-argument
|
||||
"""called when a packet arrives"""
|
||||
print(f"Received: {packet}")
|
||||
|
||||
|
||||
def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument
|
||||
"""called when we (re)connect to the radio"""
|
||||
# defaults to broadcast, specify a destination ID if you wish
|
||||
interface.sendText("hello mesh")
|
||||
|
||||
|
||||
pub.subscribe(onReceive, "meshtastic.receive")
|
||||
pub.subscribe(onConnection, "meshtastic.connection.established")
|
||||
|
||||
iface=None
|
||||
try:
|
||||
iface = TCPInterface(hostname=sys.argv[1])
|
||||
while True:
|
||||
time.sleep(1000)
|
||||
except Exception as ex:
|
||||
print(f"Error: Could not connect to {sys.argv[1]} {ex}")
|
||||
raise
|
||||
finally:
|
||||
if iface:
|
||||
iface.close()
|
||||
@@ -1,7 +1,10 @@
|
||||
"""Reply message demo script.
|
||||
To run: python examples/replymessage.py
|
||||
To run with TCP: python examples/replymessage.py --host 192.168.1.5
|
||||
To run with BLE: python examples/replymessage.py --ble 24:62:AB:DD:DF:3A
|
||||
"""Auto-reply to received text messages.
|
||||
|
||||
Purpose: demonstrate receive callback + generated reply flow.
|
||||
Transport scope: Serial default, optional TCP/BLE.
|
||||
Behavior: listens for text, prints message metadata, sends one reply per text message.
|
||||
Expected output: "Connected..." plus message/reply lines while running.
|
||||
Cleanup/error handling: clear connect failures and graceful Ctrl+C close.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -13,7 +16,7 @@ import meshtastic.tcp_interface
|
||||
import meshtastic.ble_interface
|
||||
from meshtastic.mesh_interface import MeshInterface
|
||||
|
||||
def onReceive(packet: dict, interface: MeshInterface) -> None: # pylint: disable=unused-argument
|
||||
def onReceive(packet: dict, interface: MeshInterface) -> None:
|
||||
"""Reply to every received packet with some info"""
|
||||
text: Optional[str] = packet.get("decoded", {}).get("text")
|
||||
if text:
|
||||
@@ -66,8 +69,5 @@ try:
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
if iface:
|
||||
iface.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
if iface:
|
||||
iface.close()
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
"""Program to scan for hardware
|
||||
To run: python examples/scan_for_devices.py
|
||||
"""Scan host serial hardware for supported Meshtastic devices.
|
||||
|
||||
Purpose: host-side discovery without opening a radio session.
|
||||
Transport scope: none (OS/device scanning only).
|
||||
Behavior: scans vendor IDs, lists matched devices, and candidate active ports.
|
||||
Expected output: vendor ID list, zero-or-more detected devices, and port list.
|
||||
Cleanup/error handling: exits with code 3 for bad args and code 1 on scan errors.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
from meshtastic.util import (
|
||||
@@ -10,20 +16,38 @@ from meshtastic.util import (
|
||||
get_unique_vendor_ids,
|
||||
)
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) != 1:
|
||||
print(f"usage: {sys.argv[0]}")
|
||||
print("Detect which device we might have.")
|
||||
sys.exit(3)
|
||||
|
||||
vids = get_unique_vendor_ids()
|
||||
print(f"Searching for all devices with these vendor ids {vids}")
|
||||
def main() -> int:
|
||||
"""Run device detection and print candidate ports."""
|
||||
if len(sys.argv) != 1:
|
||||
print(f"usage: {sys.argv[0]}")
|
||||
print("Detect which device we might have.")
|
||||
return 3
|
||||
|
||||
sds = detect_supported_devices()
|
||||
if len(sds) > 0:
|
||||
print("Detected possible devices:")
|
||||
for d in sds:
|
||||
print(f" name:{d.name}{d.version} firmware:{d.for_firmware}")
|
||||
parser = argparse.ArgumentParser(description="Scan host for supported Meshtastic devices")
|
||||
parser.parse_args()
|
||||
|
||||
ports = active_ports_on_supported_devices(sds)
|
||||
print(f"ports:{ports}")
|
||||
try:
|
||||
vids = get_unique_vendor_ids()
|
||||
print(f"Searching for all devices with these vendor ids {vids}")
|
||||
|
||||
supported_devices = detect_supported_devices()
|
||||
if supported_devices:
|
||||
print("Detected possible devices:")
|
||||
for device in supported_devices:
|
||||
print(
|
||||
f" name:{device.name}{device.version} firmware:{device.for_firmware}"
|
||||
)
|
||||
else:
|
||||
print("Detected possible devices: none")
|
||||
|
||||
ports = active_ports_on_supported_devices(supported_devices)
|
||||
print(f"ports:{ports}")
|
||||
except Exception as exc:
|
||||
print(f"Error: device scan failed: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
@@ -1,21 +1,40 @@
|
||||
"""Simple program to demo how to use meshtastic library.
|
||||
To run: python examples/set_owner.py Bobby 333
|
||||
"""Set local owner long/short name over serial.
|
||||
|
||||
Purpose: demonstrate a local config mutation workflow.
|
||||
Transport scope: Serial only.
|
||||
Behavior: updates owner long name and optional short name.
|
||||
Expected output: prints the owner values being applied.
|
||||
Cleanup/error handling: exits with code 3 for bad args and closes interface on exit.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
# simple arg check
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} long_name [short_name]")
|
||||
sys.exit(3)
|
||||
|
||||
iface = meshtastic.serial_interface.SerialInterface()
|
||||
long_name = sys.argv[1]
|
||||
short_name = None
|
||||
if len(sys.argv) > 2:
|
||||
short_name = sys.argv[2]
|
||||
iface.localNode.setOwner(long_name, short_name)
|
||||
iface.close()
|
||||
def main() -> int:
|
||||
"""Parse args and set owner fields."""
|
||||
if len(sys.argv) < 2:
|
||||
print(f"usage: {sys.argv[0]} long_name [short_name]")
|
||||
return 3
|
||||
|
||||
parser = argparse.ArgumentParser(description="Set Meshtastic local owner information")
|
||||
parser.add_argument("long_name", help="Owner long name")
|
||||
parser.add_argument("short_name", nargs="?", default=None, help="Owner short name")
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Setting owner long_name={args.long_name}, short_name={args.short_name}")
|
||||
try:
|
||||
with meshtastic.serial_interface.SerialInterface() as iface:
|
||||
iface.localNode.setOwner(args.long_name, args.short_name)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not set owner: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
@@ -1,6 +1,24 @@
|
||||
"""Simple program to show serial ports.
|
||||
"""List serial ports currently visible to Meshtastic helpers.
|
||||
|
||||
Purpose: fastest host-side serial port enumeration.
|
||||
Transport scope: none (host serial listing only).
|
||||
Behavior: prints result of `findPorts()`.
|
||||
Expected output: list-like representation of available candidate ports.
|
||||
Cleanup/error handling: exits with code 1 on unexpected scan error.
|
||||
"""
|
||||
|
||||
from meshtastic.util import findPorts
|
||||
|
||||
print(findPorts())
|
||||
|
||||
def main() -> int:
|
||||
"""Print discovered serial ports."""
|
||||
try:
|
||||
print(findPorts())
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not list ports: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
41
examples/tcp_connection_info_once.py
Normal file
41
examples/tcp_connection_info_once.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""Connect over TCP, print connection info once, then exit.
|
||||
|
||||
Purpose: demonstrate pubsub connection lifecycle callback.
|
||||
Transport scope: TCP only.
|
||||
Behavior: subscribe to `meshtastic.connection.established`, print `myInfo`, then close.
|
||||
Expected output: one object/line showing local radio info after connect.
|
||||
Cleanup/error handling: explicit connect failure message and clean close on callback.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
|
||||
from pubsub import pub
|
||||
import meshtastic.tcp_interface
|
||||
|
||||
|
||||
def on_connection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument
|
||||
"""Print local radio info when connected, then close."""
|
||||
print(interface.myInfo)
|
||||
interface.close()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Parse args, connect, and wait for established callback."""
|
||||
parser = argparse.ArgumentParser(description="Print radio info on TCP connect and exit")
|
||||
parser.add_argument("host", help="TCP hostname or IP of the Meshtastic node")
|
||||
args = parser.parse_args()
|
||||
|
||||
pub.subscribe(on_connection, "meshtastic.connection.established")
|
||||
|
||||
try:
|
||||
meshtastic.tcp_interface.TCPInterface(args.host)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not connect to {args.host}: {exc}")
|
||||
return 1
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,14 +1,47 @@
|
||||
"""Demonstration of how to look up a radio's location via its LAN connection.
|
||||
Before running, connect your machine to the same WiFi network as the radio.
|
||||
"""
|
||||
"""Look up local node position over TCP.
|
||||
|
||||
Purpose: demonstrate read-only position lookup via LAN/TCP.
|
||||
Transport scope: TCP only.
|
||||
Behavior: connects, reads local node position, prints it, then exits.
|
||||
Expected output: position dict for local node.
|
||||
Cleanup/error handling: explicit connect/read failures and clean close.
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
import argparse
|
||||
|
||||
import meshtastic
|
||||
import meshtastic.tcp_interface
|
||||
|
||||
radio_hostname = "meshtastic.local" # Can also be an IP
|
||||
iface = meshtastic.tcp_interface.TCPInterface(radio_hostname)
|
||||
my_node_num = iface.myInfo.my_node_num
|
||||
pos = iface.nodesByNum[my_node_num]["position"]
|
||||
print(pos)
|
||||
|
||||
iface.close()
|
||||
def main() -> int:
|
||||
"""Connect over TCP and print local node position."""
|
||||
parser = argparse.ArgumentParser(description="Print local node position over TCP")
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
default="meshtastic.local",
|
||||
help="TCP hostname or IP (default: meshtastic.local)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
iface = None
|
||||
try:
|
||||
iface = meshtastic.tcp_interface.TCPInterface(args.host)
|
||||
my_node_num = iface.myInfo.my_node_num
|
||||
pos = iface.nodesByNum[my_node_num].get("position")
|
||||
if pos is None:
|
||||
print(f"No position available for local node {my_node_num}")
|
||||
return 1
|
||||
print(pos)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not read position from {args.host}: {exc}")
|
||||
return 1
|
||||
finally:
|
||||
if iface:
|
||||
iface.close()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
|
||||
54
examples/tcp_pubsub_send_and_receive.py
Normal file
54
examples/tcp_pubsub_send_and_receive.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""Send once on connect and print received packets over TCP.
|
||||
|
||||
Purpose: demonstrate pubsub send-on-connect plus receive callback flow.
|
||||
Transport scope: TCP only.
|
||||
Behavior: sends "hello mesh" at connect, prints packets while running.
|
||||
Expected output: "Connected..." plus "Received: ..." lines for inbound packets.
|
||||
Cleanup/error handling: graceful Ctrl+C exit and clean interface close.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import time
|
||||
|
||||
from pubsub import pub
|
||||
from meshtastic.tcp_interface import TCPInterface
|
||||
|
||||
|
||||
def on_receive(packet, interface): # pylint: disable=unused-argument
|
||||
"""Print each inbound packet."""
|
||||
print(f"Received: {packet}")
|
||||
|
||||
|
||||
def on_connection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=unused-argument
|
||||
"""Send a broadcast text when connected."""
|
||||
print("Connected. Sending one broadcast message.")
|
||||
interface.sendText("hello mesh")
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Parse args, connect via TCP, and run callbacks."""
|
||||
parser = argparse.ArgumentParser(description="TCP pubsub send-and-receive example")
|
||||
parser.add_argument("host", help="TCP hostname or IP of the Meshtastic node")
|
||||
args = parser.parse_args()
|
||||
|
||||
pub.subscribe(on_receive, "meshtastic.receive")
|
||||
pub.subscribe(on_connection, "meshtastic.connection.established")
|
||||
|
||||
iface = None
|
||||
try:
|
||||
iface = TCPInterface(hostname=args.host)
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"Error: Could not connect to {args.host}: {exc}")
|
||||
return 1
|
||||
finally:
|
||||
if iface:
|
||||
iface.close()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -1,7 +1,10 @@
|
||||
"""Simple text chat demo for meshtastic.
|
||||
To run: python examples/textchat.py
|
||||
To run with TCP: python examples/textchat.py --host 192.168.1.5
|
||||
To run with BLE: python examples/textchat.py --ble 24:62:AB:DD:DF:3A
|
||||
"""Interactive text chat demo.
|
||||
|
||||
Purpose: demonstrate bidirectional text chat loop.
|
||||
Transport scope: Serial default, optional TCP/BLE.
|
||||
Behavior: prints incoming messages and sends each typed line as text.
|
||||
Expected output: incoming sender/text lines and sent messages reaching peers.
|
||||
Cleanup/error handling: explicit connect errors and graceful Ctrl+C / EOF close.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -66,8 +69,5 @@ except KeyboardInterrupt:
|
||||
except EOFError:
|
||||
pass
|
||||
finally:
|
||||
try:
|
||||
if iface:
|
||||
iface.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
if iface:
|
||||
iface.close()
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
"""Program to create and delete waypoint
|
||||
To run:
|
||||
python3 examples/waypoint.py --port /dev/ttyUSB0 create 45 test the_desc_2 '2024-12-18T23:05:23' 48.74 7.35
|
||||
python3 examples/waypoint.py delete 45
|
||||
"""Create or delete a waypoint.
|
||||
|
||||
Purpose: demonstrate waypoint mutation API (create/delete).
|
||||
Transport scope: Serial only.
|
||||
Behavior: sends waypoint create/delete request and prints API response.
|
||||
Expected output: request response object printed to stdout.
|
||||
Cleanup/error handling: explicit argument parsing and clean interface close.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
@@ -12,26 +15,28 @@ import meshtastic
|
||||
import meshtastic.serial_interface
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog='waypoint',
|
||||
description='Create and delete Meshtastic waypoint')
|
||||
parser.add_argument('--port', default=None)
|
||||
parser.add_argument('--debug', default=False, action='store_true')
|
||||
prog="waypoint", description="Create and delete Meshtastic waypoint"
|
||||
)
|
||||
parser.add_argument("--port", default=None)
|
||||
parser.add_argument("--debug", default=False, action="store_true")
|
||||
|
||||
subparsers = parser.add_subparsers(dest='cmd')
|
||||
parser_delete = subparsers.add_parser('delete', help='Delete a waypoint')
|
||||
parser_delete.add_argument('id', help="id of the waypoint")
|
||||
subparsers = parser.add_subparsers(dest="cmd", required=True)
|
||||
parser_delete = subparsers.add_parser("delete", help="Delete a waypoint")
|
||||
parser_delete.add_argument("id", type=int, help="ID of the waypoint")
|
||||
|
||||
parser_create = subparsers.add_parser('create', help='Create a new waypoint')
|
||||
parser_create.add_argument('id', help="id of the waypoint")
|
||||
parser_create.add_argument('name', help="name of the waypoint")
|
||||
parser_create.add_argument('description', help="description of the waypoint")
|
||||
parser_create.add_argument('icon', help="icon of the waypoint")
|
||||
parser_create.add_argument('expire', help="expiration date of the waypoint as interpreted by datetime.fromisoformat")
|
||||
parser_create.add_argument('latitude', help="latitude of the waypoint")
|
||||
parser_create.add_argument('longitude', help="longitude of the waypoint")
|
||||
parser_create = subparsers.add_parser("create", help="Create a new waypoint")
|
||||
parser_create.add_argument("id", type=int, help="ID of the waypoint")
|
||||
parser_create.add_argument("name", help="Name of the waypoint")
|
||||
parser_create.add_argument("description", help="Description of the waypoint")
|
||||
parser_create.add_argument("icon", help="Icon of the waypoint")
|
||||
parser_create.add_argument(
|
||||
"expire",
|
||||
help="Expiration time as ISO timestamp accepted by datetime.fromisoformat",
|
||||
)
|
||||
parser_create.add_argument("latitude", type=float, help="Latitude of the waypoint")
|
||||
parser_create.add_argument("longitude", type=float, help="Longitude of the waypoint")
|
||||
|
||||
args = parser.parse_args()
|
||||
print(args)
|
||||
|
||||
# By default will try to find a meshtastic device,
|
||||
# otherwise provide a device path like /dev/ttyUSB0
|
||||
@@ -40,18 +45,16 @@ if args.debug:
|
||||
else:
|
||||
d = None
|
||||
with meshtastic.serial_interface.SerialInterface(args.port, debugOut=d) as iface:
|
||||
if args.cmd == 'create':
|
||||
if args.cmd == "create":
|
||||
p = iface.sendWaypoint(
|
||||
waypoint_id=int(args.id),
|
||||
waypoint_id=args.id,
|
||||
name=args.name,
|
||||
description=args.description,
|
||||
icon=args.icon,
|
||||
expire=int(datetime.datetime.fromisoformat(args.expire).timestamp()),
|
||||
latitude=float(args.latitude),
|
||||
longitude=float(args.longitude),
|
||||
latitude=args.latitude,
|
||||
longitude=args.longitude,
|
||||
)
|
||||
else:
|
||||
p = iface.deleteWaypoint(int(args.id))
|
||||
p = iface.deleteWaypoint(args.id)
|
||||
print(p)
|
||||
|
||||
# iface.close()
|
||||
|
||||
Reference in New Issue
Block a user