mirror of
https://github.com/meshtastic/python.git
synced 2025-12-30 11:27:53 -05:00
add support for creating admin channels
This commit is contained in:
4
.vscode/launch.json
vendored
4
.vscode/launch.json
vendored
@@ -13,12 +13,12 @@
|
||||
"args": ["--debug", "--ble", "--device", "24:62:AB:DD:DF:3A"]
|
||||
},
|
||||
{
|
||||
"name": "meshtastic info",
|
||||
"name": "meshtastic admin",
|
||||
"type": "python",
|
||||
"request": "launch",
|
||||
"module": "meshtastic",
|
||||
"justMyCode": true,
|
||||
"args": ["--info", "--debug"]
|
||||
"args": ["--debug", "--setch-longslow", "--port", "/dev/ttyUSB1"]
|
||||
},
|
||||
{
|
||||
"name": "meshtastic tunnel",
|
||||
|
||||
@@ -180,6 +180,28 @@ class Node:
|
||||
self._sendAdmin(p)
|
||||
logging.debug("Wrote channel {channelIndex}")
|
||||
|
||||
def getChannelByName(self, name):
|
||||
"""Try to find the named channel or return None"""
|
||||
for c in (self.channels or []):
|
||||
if c.settings and c.settings.name == name:
|
||||
return c
|
||||
return None
|
||||
|
||||
def getDisabledChannel(self):
|
||||
"""Return the first channel that is disabled (i.e. available for some new use)"""
|
||||
for c in self.channels:
|
||||
if c.role == channel_pb2.Channel.Role.DISABLED:
|
||||
return c
|
||||
return None
|
||||
|
||||
def _getAdminChannelIndex(self):
|
||||
"""Return the channel number of the admin channel, or 0 if no reserved channel"""
|
||||
c = self.getChannelByName("admin")
|
||||
if c:
|
||||
return c.index
|
||||
else:
|
||||
return 0
|
||||
|
||||
def setOwner(self, long_name, short_name=None):
|
||||
"""Set device owner name"""
|
||||
nChars = 3
|
||||
@@ -295,6 +317,16 @@ class Node:
|
||||
|
||||
if quitEarly or index >= self.iface.myInfo.max_channels - 1:
|
||||
logging.debug("Finished downloading channels")
|
||||
|
||||
# Fill the rest of array with DISABLED channels
|
||||
index += 1
|
||||
while index < self.iface.myInfo.max_channels:
|
||||
ch = channel_pb2.Channel()
|
||||
ch.role = channel_pb2.Channel.Role.DISABLED
|
||||
ch.index = index
|
||||
self.partialChannels.append(ch)
|
||||
index += 1
|
||||
|
||||
self.channels = self.partialChannels
|
||||
# FIXME, the following should only be called after we have settings and channels
|
||||
self.iface._connected() # Tell everone else we are ready to go
|
||||
@@ -313,7 +345,8 @@ class Node:
|
||||
portNum=portnums_pb2.PortNum.ADMIN_APP,
|
||||
wantAck=True,
|
||||
wantResponse=wantResponse,
|
||||
onResponse=onResponse)
|
||||
onResponse=onResponse,
|
||||
channelIndex=self.iface.localNode._getAdminChannelIndex())
|
||||
|
||||
|
||||
class MeshInterface:
|
||||
@@ -404,7 +437,8 @@ class MeshInterface:
|
||||
portNum=portnums_pb2.PortNum.PRIVATE_APP, wantAck=False,
|
||||
wantResponse=False,
|
||||
hopLimit=defaultHopLimit,
|
||||
onResponse=None):
|
||||
onResponse=None,
|
||||
channelIndex=0):
|
||||
"""Send a data packet to some other node
|
||||
|
||||
Keyword Arguments:
|
||||
@@ -428,6 +462,7 @@ class MeshInterface:
|
||||
raise Exception("A non-zero port number must be specified")
|
||||
|
||||
meshPacket = mesh_pb2.MeshPacket()
|
||||
meshPacket.channel = channelIndex
|
||||
meshPacket.decoded.payload = data
|
||||
meshPacket.decoded.portnum = portNum
|
||||
meshPacket.decoded.want_response = wantResponse
|
||||
|
||||
@@ -2,10 +2,7 @@
|
||||
|
||||
import argparse
|
||||
import platform
|
||||
import logging
|
||||
import sys
|
||||
import codecs
|
||||
import base64
|
||||
import logging, sys, codecs, base64, os
|
||||
from . import SerialInterface, TCPInterface, BLEInterface, test, remote_hardware
|
||||
from pubsub import pub
|
||||
from . import mesh_pb2, portnums_pb2, channel_pb2
|
||||
@@ -64,6 +61,19 @@ def onConnection(interface, topic=pub.AUTO_TOPIC):
|
||||
trueTerms = {"t", "true", "yes"}
|
||||
falseTerms = {"f", "false", "no"}
|
||||
|
||||
def genPSKS256():
|
||||
return os.urandom(32)
|
||||
|
||||
def fromPSK(valstr):
|
||||
"""A special version of fromStr that assumes the user is trying to set a PSK.
|
||||
In that case we also allow "none" or "random" (to have python generate one)
|
||||
"""
|
||||
if valstr == "random":
|
||||
return genPSK256()
|
||||
elif valstr == "none":
|
||||
return bytes()
|
||||
else:
|
||||
return fromStr(valstr)
|
||||
|
||||
def fromStr(valstr):
|
||||
"""try to parse as int, float or bool (and fallback to a string as last resort)
|
||||
@@ -269,35 +279,71 @@ def onConnected(interface):
|
||||
getNode().setURL(args.seturl)
|
||||
|
||||
# handle changing channels
|
||||
|
||||
if args.ch_enable_admin:
|
||||
closeNow = True
|
||||
n = getNode()
|
||||
ch = n.getChannelByName("admin")
|
||||
if ch:
|
||||
logging.error("This node already is configured for remote admin access - no changes.")
|
||||
else:
|
||||
ch = n.getDisabledChannel()
|
||||
if not ch:
|
||||
raise Exception("No free channels were found")
|
||||
chs = channel_pb2.ChannelSettings()
|
||||
chs.psk = genPSKS256()
|
||||
chs.name = "admin"
|
||||
ch.settings.CopyFrom(chs)
|
||||
ch.role = channel_pb2.Channel.Role.SECONDARY
|
||||
print(f"Writing modified channels to device")
|
||||
n.writeChannel(ch.index)
|
||||
|
||||
if args.setchan or args.setch_longslow or args.setch_shortfast:
|
||||
closeNow = True
|
||||
|
||||
ch = getNode().channels[channelIndex]
|
||||
|
||||
def setSimpleChannel(modem_config):
|
||||
"""Set one of the simple modem_config only based channels"""
|
||||
enable = args.ch_enable # should we enable this channel?
|
||||
|
||||
# Completely new channel settings
|
||||
chs = channel_pb2.ChannelSettings()
|
||||
chs.modem_config = modem_config
|
||||
chs.psk = bytes([1]) # Use default channel psk 1
|
||||
if args.setch_longslow or args.setch_shortfast:
|
||||
if channelIndex != 0:
|
||||
raise Exception("standard channel settings can only be applied to the PRIMARY channel")
|
||||
|
||||
ch.settings.CopyFrom(chs)
|
||||
enable = True # force enable
|
||||
|
||||
# handle the simple channel set commands
|
||||
if args.setch_longslow:
|
||||
setSimpleChannel(
|
||||
channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
|
||||
def setSimpleChannel(modem_config):
|
||||
"""Set one of the simple modem_config only based channels"""
|
||||
|
||||
if args.setch_shortfast:
|
||||
setSimpleChannel(
|
||||
channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128)
|
||||
# Completely new channel settings
|
||||
chs = channel_pb2.ChannelSettings()
|
||||
chs.modem_config = modem_config
|
||||
chs.psk = bytes([1]) # Use default channel psk 1
|
||||
|
||||
ch.settings.CopyFrom(chs)
|
||||
|
||||
# handle the simple channel set commands
|
||||
if args.setch_longslow:
|
||||
setSimpleChannel(
|
||||
channel_pb2.ChannelSettings.ModemConfig.Bw125Cr48Sf4096)
|
||||
|
||||
if args.setch_shortfast:
|
||||
setSimpleChannel(
|
||||
channel_pb2.ChannelSettings.ModemConfig.Bw500Cr45Sf128)
|
||||
|
||||
# Handle the channel settings
|
||||
for pref in (args.setchan or []):
|
||||
setPref(ch.settings, pref[0], pref[1])
|
||||
if pref[0] == "psk":
|
||||
setattr(ch.settings, pref[0], fromPSK(pref[1]))
|
||||
else:
|
||||
setPref(ch.settings, pref[0], pref[1])
|
||||
enable = True # If we set any pref, assume the user wants to enable the channel
|
||||
|
||||
print("Writing modified channels to device")
|
||||
if enable:
|
||||
ch.role = channel_pb2.Channel.Role.PRIMARY if (channelIndex == 0) else channel_pb2.Channel.Role.SECONDARY
|
||||
else:
|
||||
ch.role = channel_pb2.Channel.Role.DISABLED
|
||||
|
||||
print(f"Writing modified channels to device")
|
||||
getNode().writeChannel(channelIndex)
|
||||
|
||||
if args.info:
|
||||
@@ -354,6 +400,10 @@ def common():
|
||||
parser.print_help(sys.stderr)
|
||||
sys.exit(1)
|
||||
else:
|
||||
if args.ch_index is not None:
|
||||
global channelIndex
|
||||
channelIndex = int(args.ch_index)
|
||||
|
||||
# Some commands require dest to be set, so we now use destOrAll/destOrLocal for more lenient commands
|
||||
if not args.dest:
|
||||
args.destOrAll = "^all"
|
||||
@@ -366,7 +416,7 @@ def common():
|
||||
if args.info or args.nodes or args.set or args.seturl or args.setowner or args.setlat or args.setlon or \
|
||||
args.settime or \
|
||||
args.setch_longslow or args.setch_shortfast or args.setchan or args.sendtext or \
|
||||
args.qr:
|
||||
args.qr or args.ch_enable_admin:
|
||||
args.seriallog = "none" # assume no debug output in this case
|
||||
else:
|
||||
args.seriallog = "stdout" # default to stdout
|
||||
@@ -443,6 +493,18 @@ def initParser():
|
||||
parser.add_argument(
|
||||
"--seturl", help="Set a channel URL", action="store")
|
||||
|
||||
parser.add_argument(
|
||||
"--ch-index", help="Set the specified channel index", action="store")
|
||||
|
||||
parser.add_argument(
|
||||
"--ch-enable", help="Enable the specified channel", action="store_true", dest="ch_enable")
|
||||
|
||||
parser.add_argument(
|
||||
"--ch-disable", help="Disable the specified channel", action="store_false", dest="ch_enable")
|
||||
|
||||
parser.add_argument(
|
||||
"--ch-enable-admin", help="Assign a PSK to the admin channel, to allow remote adminstration", action="store_true")
|
||||
|
||||
parser.add_argument(
|
||||
"--setowner", help="Set device owner name", action="store")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user