Recreating broken commits

This commit is contained in:
Cynopolis
2025-04-01 22:08:06 -04:00
parent dd3a7a986f
commit 895a40a389
8 changed files with 451 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@@ -0,0 +1,4 @@
logs/
meshtastic/
.vscode/
__pycache__/

119
MeshtasticDataClasses.py Normal file
View File

@@ -0,0 +1,119 @@
from dataclasses import dataclass
from typing import Optional
@dataclass
class Message:
portnum: str
payload: str
text: str
bitfield: Optional[str] = None
replyId: Optional[str] = None
emoji: Optional[str] = None
@dataclass
class RxPacket:
_from: int
to: int
decoded: Message
id: int
rxTime: int
rxSnr: float
rxRssi: float
hopStart: int
raw: str
fromId: str
toId: str
hopLimit: Optional[int] = None
channel: Optional[int] = None
@dataclass
class Position:
latitudeI: int
longitudeI: int
altitude: float
time: int
locationSource: str
PDOP: int
groundSpeed: int
groundTrack: int
satsInView: int
precisionBits: int
raw: str
latitude: float
longitude: float
@dataclass
class User:
id: str
longName: str
shortName: str
macaddr: str
hwModel: str
publicKey: str
role: Optional[str] = None
raw: Optional[str] = None
@dataclass
class DeviceMetrics:
batteryLevel: float
voltage: float
channelUtilization: float
airUtilTx: float
uptimeSeconds: int
@dataclass
class NodeData:
num: int
user: User
snr: float
lastHeard: int
hopsAway: int
deviceMetrics: Optional[DeviceMetrics] = None
position: Optional[Position] = None
lastReceived: Optional[str] = None
hopLimit: Optional[int] = None
def to_rx_packet(rawPacket: dict) -> RxPacket:
if "from" in rawPacket:
rawPacket["_from"] = rawPacket.pop("from")
parsed_packet = RxPacket(**rawPacket)
parsed_packet.decoded = Message(**parsed_packet.decoded)
return parsed_packet
def to_node_data(rawNode: dict) -> NodeData:
# parse the dictionary into a dataclass
parsed_node = NodeData(**rawNode)
parsed_node.user = User(**parsed_node.user)
parsed_node.deviceMetrics = DeviceMetrics(**parsed_node.deviceMetrics)
return parsed_node
if __name__ == "__main__":
test_packet = {
'from': 541024136,
'to': 4294967295,
'channel': 1,
'decoded':
{
'portnum': 'TEXT_MESSAGE_APP',
'payload': b'Test',
'bitfield': 0,
'text': 'Test'
},
'id': 1489259583,
'rxTime': 1743434610,
'rxSnr': 6.25,
'hopLimit': 3,
'rxRssi': -32,
'hopStart': 3,
'raw':"",
'fromId':"",
'toId':"",
}
test = to_rx_packet(test_packet)
print(test)

73
MeshtasticLogger.py Normal file
View File

@@ -0,0 +1,73 @@
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
from MeshtasticDataClasses import RxPacket,to_rx_packet
from MeshtasticDataClasses import NodeData,to_node_data
from datetime import datetime
class MeshtasticLogger:
def __init__(self, interface: meshtastic.serial_interface.SerialInterface, log_path : str, channel : int = 1, message_received_callback = None):
# Subscribe to the recieve text event topic
pub.subscribe(self.onReceive, "meshtastic.receive.text")
self.interface = interface
self.channel: int = channel
self.log_path: str = log_path
self.message_received_callback = message_received_callback
def _cap_string_length_bytes(self, message : str, max_bytes: int):
'''
Cap the length of a string to be under max_bytes. It will just drop the excess from the string.
'''
capped_message = ""
capped_message_byte_length = 0
for char in message:
next_char_byte_size = len(char.encode('utf-8'))
if capped_message_byte_length + next_char_byte_size <= max_bytes:
capped_message_byte_length += next_char_byte_size
capped_message += char
return capped_message
def _log(self, message: str):
print(message)
with open(self.log_path, "a") as file:
file.write(message)
def send(self, message: str):
self.interface.sendText(message, channelIndex=self.channel)
timestamp: str = datetime.now().strftime("%m-%d %I:%M:%S %p")
self._log(f"{timestamp} You on channel {self.channel}: {message}\n")
def onReceive(self, packet: dict, interface: meshtastic.serial_interface.SerialInterface):
# parse the packet we recieved into a data class so we have nice type hints
try:
parsed_packet: RxPacket = to_rx_packet(packet)
except Exception as e:
self._log(e)
return
# get the current time
timestamp: str = datetime.now().strftime("%m-%d %I:%M:%S %p")
# try to grab the sender's ID. Otherwise it will just be unkown sender
sender_node_name = "Unkown Sender"
if parsed_packet.fromId in interface.nodes:
try:
sender_node: NodeData = to_node_data(interface.nodes[parsed_packet.fromId])
except Exception as e:
self._log(e)
return
sender_node_name = sender_node.user.longName
# construct and _log the message
message = f"{timestamp} {sender_node_name}: On Channel {parsed_packet.channel}, message: {parsed_packet.decoded.text}\n"
self._log(message)
# parrot out the message but make sure the message length doesn't go above 233 bytes
if parsed_packet.channel == 1:
print("Parroting message")
interface.sendText(self._cap_string_length_bytes(message, 233), channelIndex=self.channel)
if not self.message_received_callback is None:
self.message_received_callback(message)

14
MeshtasticLogger.service Normal file
View File

@@ -0,0 +1,14 @@
[Unit]
Description=A simple chat interface and autoresponder for meshtastic
[Service]
Type=simple
WorkingDirectory=/home/quinn/Projects/meshtastic/
ExecStart=sudo /home/quinn/Projects/meshtastic/meshtastic/bin/python -m flask --app /home/quinn/Projects/meshtastic/app.py run --host=0.0.0.0 --port=80
Restart=always
User=quinn
StandardOutput=append:/home/quinn/Projects/meshtastic/logs/meshtastic_system.log
StandardError=append:/home/quinn/Projects/meshtastic/logs/meshtastic_system_error.log
[Install]
WantedBy=default.target

23
README.md Normal file
View File

@@ -0,0 +1,23 @@
# Create a Virtual Environment
Create the virtual environment and activate it
`python -m venv meshtastic && source meshtastic/bin/activate`
Install the requirements
pip install requirements.txt
# Deploying Service Script Changes
Type the following commands:
`sudo cp /home/quinn/Projects/meshtastic/MeshtasticLogger.service /etc/systemd/system/ && sudo systemctl daemon-reload`
To start the service run
`sudo systemctl start MeshtasticLogger`
You can get the status of the service by running:
`sudo systemctl status MeshtasticLogger`
And you can stop the service by running:
`sudo systemctl stop MeshtasticLogger`

89
app.py Normal file
View File

@@ -0,0 +1,89 @@
from flask import Flask, render_template, request, jsonify
import meshtastic
from MeshtasticLogger import MeshtasticLogger
import os
messages = []
app = Flask(__name__)
interface = meshtastic.serial_interface.SerialInterface()
log_path = "logs/mesh_logs.log"
try:
with open(log_path, 'r') as file:
for line in file:
messages.append(line)
except FileNotFoundError:
# make the log directory if it isn't already there
with open(log_path, "w") as file:
file.write("Begin logs")
def web_print(message: str):
with app.app_context():
print(message)
messages.append(message)
get_messages()
def callback(message: str):
web_print(message)
def help_response():
help_message: str = "Commands:\nhelp - show this message\clear - clear the text log\nsend <text> - sends text to all channels\nchannel <number> - sets the current channel (0-7)\n"
web_print(help_message)
logger = MeshtasticLogger(interface, "logs/mesh_logs.log", channel=1, message_received_callback=callback)
def channel_response(args: list[str]):
if(len(args) < 2):
web_print(f"Current channel number: {logger.channel}")
return
channel: int = int(args[1])
if channel > 7 or channel < 0:
web_print("Channel must be between 0 and 7")
else:
try:
logger.channel = channel
except Exception as e:
web_print(e)
else:
web_print(f"Channel set to {channel}")
def send_response(args: list[str], command: str):
if len(args) < 2:
web_print("Please provide text to send")
else:
logger.send(command[5:])
web_print(f"Sent: {command[5:]}")
def parse_command(command):
args = command.split(" ")
if len(args) < 1:
return True
if args[0] == "clear":
messages.clear()
elif args[0] == "help":
help_response()
elif args[0] == "channel":
channel_response(args)
elif args[0] == "send":
send_response(args, command)
else:
web_print("Command not recognized. Type \'help\' for a list of commands.")
@app.route('/')
def index():
return render_template('index.html')
@app.route('/send', methods=['POST'])
def send():
data = request.json
message = data.get("message", "")
parse_command(message)
return jsonify({"messages": messages})
@app.route('/messages', methods=['GET'])
def get_messages():
return jsonify({"messages": messages})
if __name__ == '__main__':
app.run(host="0.0.0.0", port=80, debug=True)

28
requirements.txt Normal file
View File

@@ -0,0 +1,28 @@
argcomplete==3.6.1
bleak==0.22.3
blinker==1.9.0
certifi==2025.1.31
charset-normalizer==3.4.1
click==8.1.8
dbus-fast==2.44.0
dotmap==1.3.30
Flask==3.1.0
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
meshtastic==2.6.0
packaging==24.2
print-color==0.4.6
protobuf==6.30.2
Pypubsub==4.0.3
PyQRCode==1.2.1
pyserial==3.5
pytap2==2.3.0
PyYAML==6.0.2
requests==2.32.3
tabulate==0.9.0
typing_extensions==4.13.0
urllib3==2.3.0
wcwidth==0.2.13
Werkzeug==3.1.3

101
templates/index.html Normal file
View File

@@ -0,0 +1,101 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Simple Chat</title>
<style>
#inputBox { width: 80%; }
#sendButton { width: 18%; }
#displayBox { width: 100%; height: 200px; margin-top: 10px; }
</style>
</head>
<body>
<input type="text" id="inputBox" placeholder="Enter message">
<button id="sendButton">Send</button>
<textarea id="displayBox" readonly></textarea>
<script>
document.getElementById('sendButton').addEventListener('click', sendMessage);
document.getElementById('inputBox').addEventListener('keypress', function(event) {
if (event.key === 'Enter') {
event.preventDefault(); // Prevents form submission if inside a form
sendMessage();
}
});
function sendMessage() {
let message = document.getElementById('inputBox').value;
fetch('/send', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message: message })
})
.then(response => response.json())
.then(data => {
document.getElementById('displayBox').value = data.messages.join('\n');
document.getElementById('inputBox').value = '';
displayBox.scrollTop = displayBox.scrollHeight; // auto-scroll to bottom
});
}
function fetchMessages() {
fetch('/messages')
.then(response => response.json())
.then(data => {
let displayBox = document.getElementById('displayBox');
let newMessages = data.messages.join('\n');
if (displayBox.value !== newMessages) { // Only update if there's a change
displayBox.value = newMessages;
displayBox.scrollTop = displayBox.scrollHeight; // Auto-scroll to bottom
}
});
}
// Poll every second for new messages
setInterval(fetchMessages, 1000);
</script>
</body>
<style>
html, body {
height: 100%;
margin: 0;
display: flex;
flex-direction: column;
font-family: Arial, sans-serif;
}
#chatContainer {
display: flex;
flex-direction: column;
flex-grow: 1;
padding: 10px;
}
#inputRow {
display: flex;
gap: 10px;
margin-bottom: 10px;
}
#inputBox {
flex-grow: 0;
padding: 10px;
font-size: 16px;
}
#sendButton {
padding: 10px 10px;
font-size: 16px;
}
#displayBox {
flex-grow: 1; /* Takes up the remaining space */
width: 100%;
resize: none; /* Prevents manual resizing */
padding: 10px;
font-size: 16px;
overflow-y: auto; /* Enables scrolling */
}
</style>
</html>