Untitled
unknown
plain_text
a year ago
16 kB
6
Indexable
import serial
import tkinter as tk
from tkinter import ttk, messagebox
import asyncio
import serial_asyncio
import serial.tools.list_ports
import threading
import queue
import time # Import time module for delays
def list_serial_ports():
"""
List all available serial ports.
Returns:
list: A list of available serial port device names.
"""
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
def create_data_string(named_arrays, delimiter="|"):
"""
Create a string representation of multiple named arrays.
Parameters:
named_arrays: A dictionary where keys are names (strings) and values are lists of data.
delimiter: The string used to separate different arrays (default is '|').
Returns:
A string that represents the concatenated values of all input arrays, each prefixed by its name.
"""
# Combine each named array into a string of the format: "name:data1,data2,data3..."
parts = [f"{name}:{','.join(map(str, values))}" for name, values in named_arrays.items()]
# Join all parts with the main delimiter
data_string = delimiter.join(parts)
return data_string
boost_values = [0, 3, 6, 10, 15, 20, 30, 40, 60, 80, 100]
fuel_values = [18, 19, 21, 23, 26, 29, 34, 50, 55, 62, 62]
class ECUApp:
def __init__(self, root):
self.root = root
self.root.title("EFC Live by Power Driven Diesel")
self.root.configure(bg="black")
# Queue for thread-safe communication from serial thread to Tkinter
self.queue = queue.Queue()
# Initialize serial connection variables
self.serial_thread = None
self.stop_event = threading.Event()
self.writer = None # To store the serial writer
# Add labels for title
title_label = tk.Label(
root, text="EFC Live by Power Driven Diesel", bg="gray", fg="white", font=("Arial", 16, "bold")
)
title_label.grid(row=0, column=0, columnspan=2, pady=10)
# Values Read From ECU Section
read_label = tk.Label(
root, text="Values Read From ECU", bg="black", fg="white", font=("Arial", 12, "bold")
)
read_label.grid(row=1, column=0, sticky="w", padx=10)
# Entry widgets for RPM, Boost, and Fuel
self.rpm_entry = self.create_value_entry(root, "RPM", "2038", row=2)
self.boost_entry = self.create_value_entry(root, "Boost", "12", row=3)
self.fuel_entry = self.create_value_entry(root, "Fuel Percent", "34%", row=4)
# Values Sent To ECU Section
sent_label = tk.Label(
root, text="Values Sent To ECU", bg="black", fg="white", font=("Arial", 12, "bold")
)
sent_label.grid(row=5, column=0, sticky="w", padx=10, pady=(20, 5))
boost_label = tk.Label(
root, text="Boost Table", bg="black", fg="white", font=("Arial", 10)
)
boost_label.grid(row=6, column=0, sticky="w", padx=10)
# Editable Table
self.create_table(root)
# Port Picker UI
port_label = tk.Label(root, text="Select Serial Port:", bg="black", fg="white", font=("Arial", 10))
port_label.grid(row=9, column=0, sticky="e", padx=10, pady=5)
self.selected_port = tk.StringVar()
self.port_dropdown = ttk.Combobox(root, textvariable=self.selected_port, values=list_serial_ports(),
state="readonly")
self.port_dropdown.grid(row=9, column=1, sticky="w", padx=10)
# Connect Button
connect_button = tk.Button(
root, text="Connect", command=self.connect_serial, bg="gray", fg="white", font=("Arial", 10)
)
connect_button.grid(row=10, column=0, columnspan=2, pady=5)
# Save Button
save_button = tk.Button(
root, text="Submit to ECU", command=self.save_data, bg="gray", fg="white", font=("Arial", 12, "bold")
)
save_button.grid(row=11, column=0, columnspan=2, pady=10)
# Status Label
self.status_label = tk.Label(root, text="Disconnected", bg="black", fg="red", font=("Arial", 10))
self.status_label.grid(row=12, column=0, columnspan=2, pady=5)
# Start checking the queue for serial data
self.root.after(100, self.process_serial_queue)
# Handle window closing
self.root.protocol("WM_DELETE_WINDOW", self.on_close)
def create_value_entry(self, root, label_text, value, row):
"""Create a labeled entry widget and return the entry widget."""
label = tk.Label(root, text=label_text, bg="black", fg="white", font=("Arial", 10))
label.grid(row=row, column=0, sticky="e", padx=10, pady=5)
entry = tk.Entry(root, width=10, bg="gray", fg="white", font=("Arial", 10))
entry.insert(0, value)
entry.grid(row=row, column=1, sticky="w", padx=10)
return entry
def create_table(self, root):
columns = ["Boost", "Fuel%"]
tree_frame = tk.Frame(root, bg="black")
tree_frame.grid(row=7, column=0, columnspan=2, padx=10, pady=5)
self.tree = ttk.Treeview(tree_frame, columns=columns, show="headings", height=len(boost_values))
self.tree.grid(row=0, column=0)
# Style the Treeview
style = ttk.Style()
style.theme_use("clam")
style.configure("Treeview.Heading", background="gray", foreground="white", font=("Arial", 10, "bold"))
style.configure("Treeview", background="black", foreground="white", fieldbackground="black", rowheight=25)
# Define columns
self.tree.heading("Boost", text="Boost")
self.tree.heading("Fuel%", text="Fuel%")
self.tree.column("Boost", width=60, anchor="center")
self.tree.column("Fuel%", width=60, anchor="center")
# Insert data into table
for boost, fuel in zip(boost_values, fuel_values):
self.tree.insert("", "end", values=(boost, fuel))
# Add editing capability
def edit_cell(event):
selected_items = self.tree.selection()
if not selected_items:
return
selected_item = selected_items[0]
column = self.tree.identify_column(event.x)
col_index = int(column.replace("#", "")) - 1
# Get the selected item's values
current_values = self.tree.item(selected_item, "values")
current_value = current_values[col_index]
# Create an entry widget to edit
entry = tk.Entry(tree_frame, width=10)
entry.insert(0, current_value)
entry.focus()
# Position entry over the cell
bbox = self.tree.bbox(selected_item, column)
if bbox:
entry.place(x=bbox[0], y=bbox[1])
# Commit changes on pressing Enter
def commit_edit(event):
nonlocal current_values # Declare as nonlocal to modify the outer variable
new_value = entry.get()
try:
# Attempt to convert to integer
int(new_value)
# Update the tree item
current_values = list(current_values) # Now refers to the nonlocal variable
current_values[col_index] = new_value
self.tree.item(selected_item, values=current_values)
except ValueError:
messagebox.showerror("Invalid input", "Please enter a valid integer.")
finally:
entry.destroy()
# Close entry on focus out
def close_edit(event=None):
entry.destroy()
entry.bind("<Return>", commit_edit)
entry.bind("<FocusOut>", close_edit)
self.tree.bind("<Double-1>", edit_cell)
def connect_serial(self):
"""
Initiate connection to the selected serial port.
"""
port = self.selected_port.get()
if not port:
messagebox.showwarning("No Port Selected", "Please select a serial port to connect.")
return
if self.serial_thread and self.serial_thread.is_alive():
messagebox.showinfo("Already Connected", f"Already connected to {port}.")
return
# Start the serial thread
self.serial_thread = threading.Thread(target=self.serial_asyncio_loop, args=(port,), daemon=True)
self.serial_thread.start()
self.status_label.config(text=f"Connecting to {port}...", fg="yellow")
print(f"Starting connection thread for {port}")
def serial_asyncio_loop(self, port):
"""Run the asyncio event loop for serial communication."""
self.loop = asyncio.new_event_loop() # Create a new event loop
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.read_from_serial(port, 9600, self.queue))
self.loop.run_forever()
self.loop.close()
async def parse_serial_data(self, data):
"""
Parse key-value pairs from the serial data.
Parameters:
data (str): The raw data string from Arduino.
Returns:
dict: Parsed data with keys and integer values.
"""
result = {}
try:
pairs = data.split(",")
for pair in pairs:
key, value = pair.split("=")
# Remove any whitespace and convert to appropriate type
key = key.strip().lower()
value = value.strip()
if key == "fuel":
result[key] = int(value) # Assuming fuel is sent as an integer
else:
result[key] = int(value)
except ValueError:
print(f"Invalid data format: {data}")
return result
async def read_from_serial(self, port, baudrate, data_queue):
"""
Read data from the serial port asynchronously and put it into a queue.
Parameters:
port (str): The serial port to connect to.
baudrate (int): The baud rate for serial communication.
data_queue (queue.Queue): The queue to put the parsed data.
"""
print(f"Attempting to open serial connection on {port} with baudrate {baudrate}")
try:
reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate)
self.writer = writer # Store the writer for sending data
print(f"Connected to {port}")
self.root.after(0, lambda: self.status_label.config(text=f"Connected to {port}", fg="green"))
# Wait briefly to allow Arduino to reset
await asyncio.sleep(2)
# Flush any initial data
writer.write(b'\n')
await writer.drain()
while not self.stop_event.is_set():
try:
line = await reader.readline()
if line:
data = line.decode().strip()
parsed_data = await self.parse_serial_data(data)
if "rpm" in parsed_data and "boost" in parsed_data and "fuel" in parsed_data:
rpm_value = parsed_data["rpm"]
boost_value = parsed_data["boost"]
fuel_value = parsed_data["fuel"]
print(f"Received: rpm={rpm_value}, boost={boost_value}, fuel={fuel_value}")
data_queue.put((rpm_value, boost_value, fuel_value))
else:
print(f"Missing keys or invalid data: {data}")
except Exception as e:
print(f"Error reading serial data: {e}")
await asyncio.sleep(0.1) # Small delay to prevent overutilization
except Exception as e:
print(f"Failed to open serial connection: {e}")
self.root.after(0, lambda: self.status_label.config(text=f"Connection Failed: {port}", fg="red"))
messagebox.showerror("Connection Error", f"Failed to open serial connection on {port}.\nError: {e}")
def process_serial_queue(self):
"""
Process data from the serial queue and update the GUI.
This method is called periodically using Tkinter's after() method.
"""
try:
while not self.queue.empty():
rpm, boost, fuel = self.queue.get_nowait()
self.update_gui_values(rpm, boost, fuel)
except queue.Empty:
pass
self.root.after(100, self.process_serial_queue)
def update_gui_values(self, rpm_value, boost_value, fuel_value):
"""
Update the GUI values for RPM, Boost, and Fuel Percent.
Parameters:
rpm_value (int): The RPM value received from Arduino.
boost_value (int): The Boost value received from Arduino.
fuel_value (int): The Fuel Percent value received from Arduino.
"""
self.rpm_entry.delete(0, tk.END)
self.rpm_entry.insert(0, str(rpm_value))
self.boost_entry.delete(0, tk.END)
self.boost_entry.insert(0, str(boost_value))
self.fuel_entry.delete(0, tk.END)
self.fuel_entry.insert(0, f"{fuel_value}%")
def save_data(self):
"""
Save table data to a string and send it to the Arduino.
"""
boost_values = []
fuel_values = []
for row in self.tree.get_children():
values = self.tree.item(row)["values"]
boost_values.append(int(values[0]))
fuel_values.append(int(values[1]))
print("Updated Boost Values:", boost_values)
print("Updated Fuel Values:", fuel_values)
named_arrays = {
"boost": boost_values,
"fuel": fuel_values
}
data_string = create_data_string(named_arrays)
print(f"Data to send: {data_string}")
# Send data asynchronously using the existing writer
if self.writer:
try:
# Use the explicitly stored loop to schedule the write operation
self.loop.call_soon_threadsafe(self.writer.write, data_string.encode('utf-8') + b'\n')
asyncio.run_coroutine_threadsafe(self.writer.drain(), self.loop)
print(f"Sent data: {data_string}")
messagebox.showinfo("Data Sent", f"Data sent to ECU:\n{data_string}")
except Exception as e:
print(f"Failed to send data: {e}")
messagebox.showerror("Send Error", f"Failed to send data to ECU.\nError: {e}")
else:
print("Cannot send data: Serial writer not available.")
messagebox.showwarning("Send Data Failed", "Cannot send data: Serial connection not established.")
def on_close(self):
"""Handle closing of the application."""
print("Closing application...")
self.stop_event.set()
if self.serial_thread and self.serial_thread.is_alive():
# Attempt to stop the asyncio loop
try:
asyncio.run_coroutine_threadsafe(self.shutdown_asyncio(), self.loop)
except Exception as e:
print(f"Error during shutdown: {e}")
self.root.destroy()
async def shutdown_asyncio(self):
"""Shutdown the asyncio event loop."""
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
# Create the app
if __name__ == "__main__":
root = tk.Tk()
app = ECUApp(root)
root.mainloop()
Editor is loading...
Leave a Comment