Deperately trying to figure out angular

This commit is contained in:
Cynopolis
2025-04-04 20:11:23 -04:00
parent 185182b92f
commit 9465a3915c
25 changed files with 14892 additions and 0 deletions

18
backend/CommandHandler.py Normal file
View File

@@ -0,0 +1,18 @@
class CommandHandler:
def __init__(self, print_method):
self.commands: dict = {}
self.print = print_method
def register_callback(self, key: str, callback):
self.commands[key.lower()] = callback
def parse_command(self, command : str):
args :list[str] = command.split(" ")
if args[0].lower() in self.commands:
return_message = self.commands[args[0].lower()](args[1:])
if not (return_message is None):
self.print(return_message)
else:
self.print("Command not recognized. Type 'help' for a list of commands.")

View File

@@ -0,0 +1,119 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class Message:
portnum: str
payload: str
text: str
bitfield: Optional[str] = None
replyId: Optional[str] = None
emoji: Optional[str] = None
@dataclass
class RxPacket:
_from: int
to: int
decoded: Message
id: int
rxTime: int
rxSnr: float
rxRssi: float
hopStart: int
raw: str
fromId: str
toId: str
hopLimit: Optional[int] = None
channel: Optional[int] = None
@dataclass
class Position:
latitudeI: int
longitudeI: int
altitude: float
time: int
locationSource: str
PDOP: int
groundSpeed: int
groundTrack: int
satsInView: int
precisionBits: int
raw: str
latitude: float
longitude: float
@dataclass
class User:
id: str
longName: str
shortName: str
macaddr: str
hwModel: str
publicKey: str
role: Optional[str] = None
raw: Optional[str] = None
@dataclass
class DeviceMetrics:
batteryLevel: float
voltage: float
channelUtilization: float
airUtilTx: float
uptimeSeconds: int
@dataclass
class NodeData:
num: int
user: User
snr: float
lastHeard: int
hopsAway: int
deviceMetrics: Optional[DeviceMetrics] = None
position: Optional[Position] = None
lastReceived: Optional[str] = None
hopLimit: Optional[int] = None
def to_rx_packet(rawPacket: dict) -> RxPacket:
if "from" in rawPacket:
rawPacket["_from"] = rawPacket.pop("from")
parsed_packet = RxPacket(**rawPacket)
parsed_packet.decoded = Message(**parsed_packet.decoded)
return parsed_packet
def to_node_data(rawNode: dict) -> NodeData:
# parse the dictionary into a dataclass
parsed_node = NodeData(**rawNode)
parsed_node.user = User(**parsed_node.user)
parsed_node.deviceMetrics = DeviceMetrics(**parsed_node.deviceMetrics)
return parsed_node
if __name__ == "__main__":
test_packet = {
'from': 541024136,
'to': 4294967295,
'channel': 1,
'decoded':
{
'portnum': 'TEXT_MESSAGE_APP',
'payload': b'Test',
'bitfield': 0,
'text': 'Test'
},
'id': 1489259583,
'rxTime': 1743434610,
'rxSnr': 6.25,
'hopLimit': 3,
'rxRssi': -32,
'hopStart': 3,
'raw':"",
'fromId':"",
'toId':"",
}
test = to_rx_packet(test_packet)
print(test)

View File

@@ -0,0 +1,73 @@
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
from MeshtasticDataClasses import RxPacket,to_rx_packet
from MeshtasticDataClasses import NodeData,to_node_data
from datetime import datetime
class MeshtasticLogger:
def __init__(self, interface: meshtastic.serial_interface.SerialInterface, log_path : str, channel : int = 1, message_received_callback = None):
# Subscribe to the recieve text event topic
pub.subscribe(self.onReceive, "meshtastic.receive.text")
self.interface = interface
self.channel: int = channel
self.log_path: str = log_path
self.message_received_callback = message_received_callback
def _cap_string_length_bytes(self, message : str, max_bytes: int):
'''
Cap the length of a string to be under max_bytes. It will just drop the excess from the string.
'''
capped_message = ""
capped_message_byte_length = 0
for char in message:
next_char_byte_size = len(char.encode('utf-8'))
if capped_message_byte_length + next_char_byte_size <= max_bytes:
capped_message_byte_length += next_char_byte_size
capped_message += char
return capped_message
def _log(self, message: str):
print(message)
with open(self.log_path, "a") as file:
file.write(message)
def send(self, message: str):
self.interface.sendText(message, channelIndex=self.channel)
timestamp: str = datetime.now().strftime("%m-%d %I:%M:%S %p")
self._log(f"{timestamp} You on channel {self.channel}: {message}\n")
def onReceive(self, packet: dict, interface: meshtastic.serial_interface.SerialInterface):
# parse the packet we recieved into a data class so we have nice type hints
try:
parsed_packet: RxPacket = to_rx_packet(packet)
except Exception as e:
self._log(e)
return
# get the current time
timestamp: str = datetime.now().strftime("%m-%d %I:%M:%S %p")
# try to grab the sender's ID. Otherwise it will just be unkown sender
sender_node_name = "Unkown Sender"
if parsed_packet.fromId in interface.nodes:
try:
sender_node: NodeData = to_node_data(interface.nodes[parsed_packet.fromId])
except Exception as e:
self._log(e)
return
sender_node_name = sender_node.user.longName
# construct and _log the message
message = f"{timestamp} {sender_node_name}: On Channel {parsed_packet.channel}, message: {parsed_packet.decoded.text}\n"
self._log(message)
# parrot out the message but make sure the message length doesn't go above 233 bytes
if parsed_packet.channel == 1:
print("Parroting message")
interface.sendText(self._cap_string_length_bytes(message, 233), channelIndex=self.channel)
if not self.message_received_callback is None:
self.message_received_callback(message)

94
backend/app.py Normal file
View File

@@ -0,0 +1,94 @@
from flask import Flask, render_template, request, jsonify
import meshtastic
from MeshtasticLogger import MeshtasticLogger
from CommandHandler import CommandHandler
import os
# -------------------------------
# Initialize data and objects
# -------------------------------
messages = []
app = Flask(__name__)
interface = meshtastic.serial_interface.SerialInterface()
log_path = "logs/mesh_logs.log"
try:
with open(log_path, 'r') as file:
for line in file:
messages.append(line)
except FileNotFoundError:
# make the log directory if it isn't already there
with open(log_path, "w") as file:
file.write("Begin logs")
def web_print(message: str):
with app.app_context():
print(message)
messages.append(message)
get_messages()
command_handler: CommandHandler = CommandHandler(web_print)
logger = MeshtasticLogger(interface, "logs/mesh_logs.log", channel=1, message_received_callback=web_print)
# -------------------------
# Command Callback Handlers
# -------------------------
def help_response(args: list[str]):
return "Commands:\nhelp - show this message\clear - clear the text log\nsend <text> - sends text to all channels\nchannel <number> - sets the current channel (0-7)\n"
def channel_response(args: list[str]):
if len(args) < 1:
return f"Current channel number: {logger.channel}"
channel: int = int(args[0])
if channel > 7 or channel < 0:
return "Channel must be between 0 and 7"
try:
logger.channel = channel
except Exception as e:
return e
return f"Channel set to {channel}"
def send_response(args: list[str]):
if len(args) == 0:
return "Please provide text to send"
command = ""
for arg in args:
command += arg + " "
command = command[:-1] # remove the trailing space
logger.send(command)
return_message = f"Sent: {command}"
return return_message
def clear_response(args):
messages.clear()
return None
# register callbacks
command_handler.register_callback("clear", clear_response)
command_handler.register_callback("help", help_response)
command_handler.register_callback("channel", channel_response)
command_handler.register_callback("send", send_response)
# -----------------------
# Flask Callback Handlers
# -----------------------
@app.route('/')
def index():
return render_template('index.html')
@app.route('/send', methods=['POST'])
def send():
data = request.json
message = data.get("message", "")
command_handler.parse_command(message)
return jsonify({"messages": messages})
@app.route('/messages', methods=['GET'])
def get_messages():
return jsonify({"messages": messages})