Untitled
unknown
plain_text
2 months ago
16 kB
3
Indexable
import serial import tkinter as tk from tkinter import ttk, messagebox import asyncio import serial_asyncio import serial.tools.list_ports import threading import queue import time # Import time module for delays def list_serial_ports(): """ List all available serial ports. Returns: list: A list of available serial port device names. """ ports = serial.tools.list_ports.comports() return [port.device for port in ports] def create_data_string(named_arrays, delimiter="|"): """ Create a string representation of multiple named arrays. Parameters: named_arrays: A dictionary where keys are names (strings) and values are lists of data. delimiter: The string used to separate different arrays (default is '|'). Returns: A string that represents the concatenated values of all input arrays, each prefixed by its name. """ # Combine each named array into a string of the format: "name:data1,data2,data3..." parts = [f"{name}:{','.join(map(str, values))}" for name, values in named_arrays.items()] # Join all parts with the main delimiter data_string = delimiter.join(parts) return data_string boost_values = [0, 3, 6, 10, 15, 20, 30, 40, 60, 80, 100] fuel_values = [18, 19, 21, 23, 26, 29, 34, 50, 55, 62, 62] class ECUApp: def __init__(self, root): self.root = root self.root.title("EFC Live by Power Driven Diesel") self.root.configure(bg="black") # Queue for thread-safe communication from serial thread to Tkinter self.queue = queue.Queue() # Initialize serial connection variables self.serial_thread = None self.stop_event = threading.Event() self.writer = None # To store the serial writer # Add labels for title title_label = tk.Label( root, text="EFC Live by Power Driven Diesel", bg="gray", fg="white", font=("Arial", 16, "bold") ) title_label.grid(row=0, column=0, columnspan=2, pady=10) # Values Read From ECU Section read_label = tk.Label( root, text="Values Read From ECU", bg="black", fg="white", font=("Arial", 12, "bold") ) read_label.grid(row=1, column=0, sticky="w", padx=10) # Entry widgets for RPM, Boost, and Fuel self.rpm_entry = self.create_value_entry(root, "RPM", "2038", row=2) self.boost_entry = self.create_value_entry(root, "Boost", "12", row=3) self.fuel_entry = self.create_value_entry(root, "Fuel Percent", "34%", row=4) # Values Sent To ECU Section sent_label = tk.Label( root, text="Values Sent To ECU", bg="black", fg="white", font=("Arial", 12, "bold") ) sent_label.grid(row=5, column=0, sticky="w", padx=10, pady=(20, 5)) boost_label = tk.Label( root, text="Boost Table", bg="black", fg="white", font=("Arial", 10) ) boost_label.grid(row=6, column=0, sticky="w", padx=10) # Editable Table self.create_table(root) # Port Picker UI port_label = tk.Label(root, text="Select Serial Port:", bg="black", fg="white", font=("Arial", 10)) port_label.grid(row=9, column=0, sticky="e", padx=10, pady=5) self.selected_port = tk.StringVar() self.port_dropdown = ttk.Combobox(root, textvariable=self.selected_port, values=list_serial_ports(), state="readonly") self.port_dropdown.grid(row=9, column=1, sticky="w", padx=10) # Connect Button connect_button = tk.Button( root, text="Connect", command=self.connect_serial, bg="gray", fg="white", font=("Arial", 10) ) connect_button.grid(row=10, column=0, columnspan=2, pady=5) # Save Button save_button = tk.Button( root, text="Submit to ECU", command=self.save_data, bg="gray", fg="white", font=("Arial", 12, "bold") ) save_button.grid(row=11, column=0, columnspan=2, pady=10) # Status Label self.status_label = tk.Label(root, text="Disconnected", bg="black", fg="red", font=("Arial", 10)) self.status_label.grid(row=12, column=0, columnspan=2, pady=5) # Start checking the queue for serial data self.root.after(100, self.process_serial_queue) # Handle window closing self.root.protocol("WM_DELETE_WINDOW", self.on_close) def create_value_entry(self, root, label_text, value, row): """Create a labeled entry widget and return the entry widget.""" label = tk.Label(root, text=label_text, bg="black", fg="white", font=("Arial", 10)) label.grid(row=row, column=0, sticky="e", padx=10, pady=5) entry = tk.Entry(root, width=10, bg="gray", fg="white", font=("Arial", 10)) entry.insert(0, value) entry.grid(row=row, column=1, sticky="w", padx=10) return entry def create_table(self, root): columns = ["Boost", "Fuel%"] tree_frame = tk.Frame(root, bg="black") tree_frame.grid(row=7, column=0, columnspan=2, padx=10, pady=5) self.tree = ttk.Treeview(tree_frame, columns=columns, show="headings", height=len(boost_values)) self.tree.grid(row=0, column=0) # Style the Treeview style = ttk.Style() style.theme_use("clam") style.configure("Treeview.Heading", background="gray", foreground="white", font=("Arial", 10, "bold")) style.configure("Treeview", background="black", foreground="white", fieldbackground="black", rowheight=25) # Define columns self.tree.heading("Boost", text="Boost") self.tree.heading("Fuel%", text="Fuel%") self.tree.column("Boost", width=60, anchor="center") self.tree.column("Fuel%", width=60, anchor="center") # Insert data into table for boost, fuel in zip(boost_values, fuel_values): self.tree.insert("", "end", values=(boost, fuel)) # Add editing capability def edit_cell(event): selected_items = self.tree.selection() if not selected_items: return selected_item = selected_items[0] column = self.tree.identify_column(event.x) col_index = int(column.replace("#", "")) - 1 # Get the selected item's values current_values = self.tree.item(selected_item, "values") current_value = current_values[col_index] # Create an entry widget to edit entry = tk.Entry(tree_frame, width=10) entry.insert(0, current_value) entry.focus() # Position entry over the cell bbox = self.tree.bbox(selected_item, column) if bbox: entry.place(x=bbox[0], y=bbox[1]) # Commit changes on pressing Enter def commit_edit(event): nonlocal current_values # Declare as nonlocal to modify the outer variable new_value = entry.get() try: # Attempt to convert to integer int(new_value) # Update the tree item current_values = list(current_values) # Now refers to the nonlocal variable current_values[col_index] = new_value self.tree.item(selected_item, values=current_values) except ValueError: messagebox.showerror("Invalid input", "Please enter a valid integer.") finally: entry.destroy() # Close entry on focus out def close_edit(event=None): entry.destroy() entry.bind("<Return>", commit_edit) entry.bind("<FocusOut>", close_edit) self.tree.bind("<Double-1>", edit_cell) def connect_serial(self): """ Initiate connection to the selected serial port. """ port = self.selected_port.get() if not port: messagebox.showwarning("No Port Selected", "Please select a serial port to connect.") return if self.serial_thread and self.serial_thread.is_alive(): messagebox.showinfo("Already Connected", f"Already connected to {port}.") return # Start the serial thread self.serial_thread = threading.Thread(target=self.serial_asyncio_loop, args=(port,), daemon=True) self.serial_thread.start() self.status_label.config(text=f"Connecting to {port}...", fg="yellow") print(f"Starting connection thread for {port}") def serial_asyncio_loop(self, port): """Run the asyncio event loop for serial communication.""" self.loop = asyncio.new_event_loop() # Create a new event loop asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self.read_from_serial(port, 9600, self.queue)) self.loop.run_forever() self.loop.close() async def parse_serial_data(self, data): """ Parse key-value pairs from the serial data. Parameters: data (str): The raw data string from Arduino. Returns: dict: Parsed data with keys and integer values. """ result = {} try: pairs = data.split(",") for pair in pairs: key, value = pair.split("=") # Remove any whitespace and convert to appropriate type key = key.strip().lower() value = value.strip() if key == "fuel": result[key] = int(value) # Assuming fuel is sent as an integer else: result[key] = int(value) except ValueError: print(f"Invalid data format: {data}") return result async def read_from_serial(self, port, baudrate, data_queue): """ Read data from the serial port asynchronously and put it into a queue. Parameters: port (str): The serial port to connect to. baudrate (int): The baud rate for serial communication. data_queue (queue.Queue): The queue to put the parsed data. """ print(f"Attempting to open serial connection on {port} with baudrate {baudrate}") try: reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate) self.writer = writer # Store the writer for sending data print(f"Connected to {port}") self.root.after(0, lambda: self.status_label.config(text=f"Connected to {port}", fg="green")) # Wait briefly to allow Arduino to reset await asyncio.sleep(2) # Flush any initial data writer.write(b'\n') await writer.drain() while not self.stop_event.is_set(): try: line = await reader.readline() if line: data = line.decode().strip() parsed_data = await self.parse_serial_data(data) if "rpm" in parsed_data and "boost" in parsed_data and "fuel" in parsed_data: rpm_value = parsed_data["rpm"] boost_value = parsed_data["boost"] fuel_value = parsed_data["fuel"] print(f"Received: rpm={rpm_value}, boost={boost_value}, fuel={fuel_value}") data_queue.put((rpm_value, boost_value, fuel_value)) else: print(f"Missing keys or invalid data: {data}") except Exception as e: print(f"Error reading serial data: {e}") await asyncio.sleep(0.1) # Small delay to prevent overutilization except Exception as e: print(f"Failed to open serial connection: {e}") self.root.after(0, lambda: self.status_label.config(text=f"Connection Failed: {port}", fg="red")) messagebox.showerror("Connection Error", f"Failed to open serial connection on {port}.\nError: {e}") def process_serial_queue(self): """ Process data from the serial queue and update the GUI. This method is called periodically using Tkinter's after() method. """ try: while not self.queue.empty(): rpm, boost, fuel = self.queue.get_nowait() self.update_gui_values(rpm, boost, fuel) except queue.Empty: pass self.root.after(100, self.process_serial_queue) def update_gui_values(self, rpm_value, boost_value, fuel_value): """ Update the GUI values for RPM, Boost, and Fuel Percent. Parameters: rpm_value (int): The RPM value received from Arduino. boost_value (int): The Boost value received from Arduino. fuel_value (int): The Fuel Percent value received from Arduino. """ self.rpm_entry.delete(0, tk.END) self.rpm_entry.insert(0, str(rpm_value)) self.boost_entry.delete(0, tk.END) self.boost_entry.insert(0, str(boost_value)) self.fuel_entry.delete(0, tk.END) self.fuel_entry.insert(0, f"{fuel_value}%") def save_data(self): """ Save table data to a string and send it to the Arduino. """ boost_values = [] fuel_values = [] for row in self.tree.get_children(): values = self.tree.item(row)["values"] boost_values.append(int(values[0])) fuel_values.append(int(values[1])) print("Updated Boost Values:", boost_values) print("Updated Fuel Values:", fuel_values) named_arrays = { "boost": boost_values, "fuel": fuel_values } data_string = create_data_string(named_arrays) print(f"Data to send: {data_string}") # Send data asynchronously using the existing writer if self.writer: try: # Use the explicitly stored loop to schedule the write operation self.loop.call_soon_threadsafe(self.writer.write, data_string.encode('utf-8') + b'\n') asyncio.run_coroutine_threadsafe(self.writer.drain(), self.loop) print(f"Sent data: {data_string}") messagebox.showinfo("Data Sent", f"Data sent to ECU:\n{data_string}") except Exception as e: print(f"Failed to send data: {e}") messagebox.showerror("Send Error", f"Failed to send data to ECU.\nError: {e}") else: print("Cannot send data: Serial writer not available.") messagebox.showwarning("Send Data Failed", "Cannot send data: Serial connection not established.") def on_close(self): """Handle closing of the application.""" print("Closing application...") self.stop_event.set() if self.serial_thread and self.serial_thread.is_alive(): # Attempt to stop the asyncio loop try: asyncio.run_coroutine_threadsafe(self.shutdown_asyncio(), self.loop) except Exception as e: print(f"Error during shutdown: {e}") self.root.destroy() async def shutdown_asyncio(self): """Shutdown the asyncio event loop.""" tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() await asyncio.gather(*tasks, return_exceptions=True) self.loop.stop() # Create the app if __name__ == "__main__": root = tk.Tk() app = ECUApp(root) root.mainloop()
Editor is loading...
Leave a Comment