'--set' works as well, also chained commands

This commit is contained in:
GUVWAF
2023-03-18 21:12:05 +01:00
parent 802768e0cc
commit b2cfebc5a7
2 changed files with 53 additions and 30 deletions

View File

@@ -53,11 +53,11 @@ def onConnection(interface, topic=pub.AUTO_TOPIC): # pylint: disable=W0613
print(f"Connection changed: {topic.getName()}")
def getPref(interface, dest, comp_name):
def getPref(node, comp_name):
"""Get a channel or preferences value"""
name = splitCompoundName(comp_name)
fullConfig = name[0] == name[1] # We want the full config
wholeField = name[0] == name[1] # We want the whole field
camel_name = meshtastic.util.snake_to_camel(name[1])
# Note: protobufs has the keys in snake_case, so snake internally
@@ -65,9 +65,9 @@ def getPref(interface, dest, comp_name):
logging.debug(f'snake_name:{snake_name} camel_name:{camel_name}')
logging.debug(f'use camel:{Globals.getInstance().get_camel_case()}')
# First validate the input by looking at config of connected node
localConfig = interface.getNode(BROADCAST_ADDR).localConfig
moduleConfig = interface.getNode(BROADCAST_ADDR).moduleConfig
# First validate the input
localConfig = node.localConfig
moduleConfig = node.moduleConfig
found = False
for config in [localConfig, moduleConfig]:
objDesc = config.DESCRIPTOR
@@ -75,7 +75,7 @@ def getPref(interface, dest, comp_name):
pref = False
if config_type:
pref = config_type.message_type.fields_by_name.get(snake_name)
if pref or fullConfig:
if pref or wholeField:
found = True
break
@@ -89,10 +89,11 @@ def getPref(interface, dest, comp_name):
printConfig(moduleConfig)
return False
if dest == BROADCAST_ADDR:
# Check if we need to request the config
if len(config.ListFields()) != 0:
# read the value
config_values = getattr(config, config_type.name)
if not fullConfig:
if not wholeField:
pref_value = getattr(config_values, pref.name)
if Globals.getInstance().get_camel_case():
print(f"{str(config_type.name)}.{camel_name}: {str(pref_value)}")
@@ -101,11 +102,11 @@ def getPref(interface, dest, comp_name):
print(f"{str(config_type.name)}.{snake_name}: {str(pref_value)}")
logging.debug(f"{str(config_type.name)}.{snake_name}: {str(pref_value)}")
else:
print(f"{str(config_type.name)}: {str(config_values)}")
print(f"{str(config_type.name)}:\n{str(config_values)}")
logging.debug(f"{str(config_type.name)}: {str(config_values)}")
else:
# Always show full config for remote node
interface.getNode(dest, False).requestConfig(config_type)
# Always show whole field for remote node
node.requestConfig(config_type)
return True
@@ -405,18 +406,25 @@ def onConnected(interface):
# handle settings
if args.set:
closeNow = True
node = interface.getNode(args.dest)
node = interface.getNode(args.dest, False)
# Handle the int/float/bool arguments
pref = None
for pref in args.set:
found = setPref(node.localConfig, pref[0], pref[1])
if not found:
found = setPref(node.moduleConfig, pref[0], pref[1])
found = False
field = splitCompoundName(pref[0].lower())[0]
for config in [node.localConfig, node.moduleConfig]:
config_type = config.DESCRIPTOR.fields_by_name.get(field)
if config_type:
if len(config.ListFields()) == 0:
node.requestConfig(config.DESCRIPTOR.fields_by_name.get(field))
found = setPref(config, pref[0], pref[1])
if found:
break
if found:
print("Writing modified preferences to device")
interface.getNode(args.dest).writeConfig(splitCompoundName(pref[0].lower())[0])
node.writeConfig(field)
else:
if Globals.getInstance().get_camel_case():
print(f"{node.localConfig.__class__.__name__} and {node.moduleConfig.__class__.__name__} do not have an attribute {pref[0]}.")
@@ -630,8 +638,9 @@ def onConnected(interface):
if args.get:
closeNow = True
node = interface.getNode(args.dest, False)
for pref in args.get:
found = getPref(interface, args.dest, pref[0])
found = getPref(node, pref[0])
if found:
print("Completed getting preferences")

View File

@@ -6,7 +6,7 @@ import base64
import time
from google.protobuf.json_format import MessageToJson
from meshtastic import portnums_pb2, apponly_pb2, admin_pb2, channel_pb2, localonly_pb2
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK
from meshtastic.util import pskToString, stripnl, Timeout, our_exit, fromPSK, camel_to_snake
class Node:
@@ -80,29 +80,43 @@ class Node:
self.iface._acknowledgment.receivedAck = True
print("")
if "getConfigResponse" in p["decoded"]["admin"]:
prefs = stripnl(p["decoded"]["admin"]["getConfigResponse"])
print(f"Preferences: {prefs}\n")
resp = p["decoded"]["admin"]["getConfigResponse"]
field = list(resp.keys())[0]
config_type = self.localConfig.DESCRIPTOR.fields_by_name.get(field)
config_values = getattr(self.localConfig, config_type.name)
else:
prefs = stripnl(p["decoded"]["admin"]["getModuleConfigResponse"])
print(f"Module preferences: {prefs}\n")
resp = p["decoded"]["admin"]["getModuleConfigResponse"]
field = list(resp.keys())[0]
config_type = self.moduleConfig.DESCRIPTOR.fields_by_name.get(field)
config_values = getattr(self.moduleConfig, config_type.name)
for key, value in resp[field].items():
setattr(config_values, camel_to_snake(key), value)
print(f"{str(field)}:\n{str(config_values)}")
def requestConfig(self, configType):
print("Requesting config from remote node (this can take a while).")
print("Be sure:")
print(" 1. There is a SECONDARY channel named 'admin'.")
print(" 2. The '--seturl' was used to configure.")
print(" 3. All devices have the same modem config. (i.e., '--ch-longfast')")
localOnly = False
if self == self.iface.localNode:
localOnly = True
onResponse = None
else:
onResponse = self.onResponseRequestSettings
print("Requesting config from remote node (this can take a while).")
print("Be sure:")
print(" 1. There is a SECONDARY channel named 'admin'.")
print(" 2. The '--seturl' was used to configure.")
print(" 3. All devices have the same modem config. (i.e., '--ch-longfast')")
msgIndex = configType.index
if configType.containing_type.full_name == "LocalConfig":
p = admin_pb2.AdminMessage()
p.get_config_request = msgIndex
self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)
self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
else:
p = admin_pb2.AdminMessage()
p.get_module_config_request = msgIndex
self._sendAdmin(p, wantResponse=True, onResponse=self.onResponseRequestSettings)
self.iface.waitForAckNak()
self._sendAdmin(p, wantResponse=True, onResponse=onResponse)
if not localOnly:
self.iface.waitForAckNak()
def turnOffEncryptionOnPrimaryChannel(self):
"""Turn off encryption on primary channel."""