Untitled

 avatar
unknown
plain_text
2 months ago
16 kB
3
Indexable
import serial
import tkinter as tk
from tkinter import ttk, messagebox
import asyncio
import serial_asyncio
import serial.tools.list_ports
import threading
import queue
import time  # Import time module for delays


def list_serial_ports():
    """
    List all available serial ports.

    Returns:
        list: A list of available serial port device names.
    """
    ports = serial.tools.list_ports.comports()
    return [port.device for port in ports]


def create_data_string(named_arrays, delimiter="|"):
    """
    Create a string representation of multiple named arrays.

    Parameters:
    named_arrays: A dictionary where keys are names (strings) and values are lists of data.
    delimiter: The string used to separate different arrays (default is '|').

    Returns:
    A string that represents the concatenated values of all input arrays, each prefixed by its name.
    """
    # Combine each named array into a string of the format: "name:data1,data2,data3..."
    parts = [f"{name}:{','.join(map(str, values))}" for name, values in named_arrays.items()]
    # Join all parts with the main delimiter
    data_string = delimiter.join(parts)
    return data_string


boost_values = [0, 3, 6, 10, 15, 20, 30, 40, 60, 80, 100]
fuel_values = [18, 19, 21, 23, 26, 29, 34, 50, 55, 62, 62]


class ECUApp:
    def __init__(self, root):
        self.root = root
        self.root.title("EFC Live by Power Driven Diesel")
        self.root.configure(bg="black")

        # Queue for thread-safe communication from serial thread to Tkinter
        self.queue = queue.Queue()

        # Initialize serial connection variables
        self.serial_thread = None
        self.stop_event = threading.Event()
        self.writer = None  # To store the serial writer

        # Add labels for title
        title_label = tk.Label(
            root, text="EFC Live by Power Driven Diesel", bg="gray", fg="white", font=("Arial", 16, "bold")
        )
        title_label.grid(row=0, column=0, columnspan=2, pady=10)

        # Values Read From ECU Section
        read_label = tk.Label(
            root, text="Values Read From ECU", bg="black", fg="white", font=("Arial", 12, "bold")
        )
        read_label.grid(row=1, column=0, sticky="w", padx=10)

        # Entry widgets for RPM, Boost, and Fuel
        self.rpm_entry = self.create_value_entry(root, "RPM", "2038", row=2)
        self.boost_entry = self.create_value_entry(root, "Boost", "12", row=3)
        self.fuel_entry = self.create_value_entry(root, "Fuel Percent", "34%", row=4)

        # Values Sent To ECU Section
        sent_label = tk.Label(
            root, text="Values Sent To ECU", bg="black", fg="white", font=("Arial", 12, "bold")
        )
        sent_label.grid(row=5, column=0, sticky="w", padx=10, pady=(20, 5))

        boost_label = tk.Label(
            root, text="Boost Table", bg="black", fg="white", font=("Arial", 10)
        )
        boost_label.grid(row=6, column=0, sticky="w", padx=10)

        # Editable Table
        self.create_table(root)

        # Port Picker UI
        port_label = tk.Label(root, text="Select Serial Port:", bg="black", fg="white", font=("Arial", 10))
        port_label.grid(row=9, column=0, sticky="e", padx=10, pady=5)

        self.selected_port = tk.StringVar()
        self.port_dropdown = ttk.Combobox(root, textvariable=self.selected_port, values=list_serial_ports(),
                                          state="readonly")
        self.port_dropdown.grid(row=9, column=1, sticky="w", padx=10)

        # Connect Button
        connect_button = tk.Button(
            root, text="Connect", command=self.connect_serial, bg="gray", fg="white", font=("Arial", 10)
        )
        connect_button.grid(row=10, column=0, columnspan=2, pady=5)

        # Save Button
        save_button = tk.Button(
            root, text="Submit to ECU", command=self.save_data, bg="gray", fg="white", font=("Arial", 12, "bold")
        )
        save_button.grid(row=11, column=0, columnspan=2, pady=10)

        # Status Label
        self.status_label = tk.Label(root, text="Disconnected", bg="black", fg="red", font=("Arial", 10))
        self.status_label.grid(row=12, column=0, columnspan=2, pady=5)

        # Start checking the queue for serial data
        self.root.after(100, self.process_serial_queue)

        # Handle window closing
        self.root.protocol("WM_DELETE_WINDOW", self.on_close)

    def create_value_entry(self, root, label_text, value, row):
        """Create a labeled entry widget and return the entry widget."""
        label = tk.Label(root, text=label_text, bg="black", fg="white", font=("Arial", 10))
        label.grid(row=row, column=0, sticky="e", padx=10, pady=5)
        entry = tk.Entry(root, width=10, bg="gray", fg="white", font=("Arial", 10))
        entry.insert(0, value)
        entry.grid(row=row, column=1, sticky="w", padx=10)
        return entry

    def create_table(self, root):
        columns = ["Boost", "Fuel%"]

        tree_frame = tk.Frame(root, bg="black")
        tree_frame.grid(row=7, column=0, columnspan=2, padx=10, pady=5)

        self.tree = ttk.Treeview(tree_frame, columns=columns, show="headings", height=len(boost_values))
        self.tree.grid(row=0, column=0)

        # Style the Treeview
        style = ttk.Style()
        style.theme_use("clam")
        style.configure("Treeview.Heading", background="gray", foreground="white", font=("Arial", 10, "bold"))
        style.configure("Treeview", background="black", foreground="white", fieldbackground="black", rowheight=25)

        # Define columns
        self.tree.heading("Boost", text="Boost")
        self.tree.heading("Fuel%", text="Fuel%")

        self.tree.column("Boost", width=60, anchor="center")
        self.tree.column("Fuel%", width=60, anchor="center")

        # Insert data into table
        for boost, fuel in zip(boost_values, fuel_values):
            self.tree.insert("", "end", values=(boost, fuel))

        # Add editing capability
        def edit_cell(event):
            selected_items = self.tree.selection()
            if not selected_items:
                return
            selected_item = selected_items[0]
            column = self.tree.identify_column(event.x)
            col_index = int(column.replace("#", "")) - 1

            # Get the selected item's values
            current_values = self.tree.item(selected_item, "values")
            current_value = current_values[col_index]

            # Create an entry widget to edit
            entry = tk.Entry(tree_frame, width=10)
            entry.insert(0, current_value)
            entry.focus()

            # Position entry over the cell
            bbox = self.tree.bbox(selected_item, column)
            if bbox:
                entry.place(x=bbox[0], y=bbox[1])

                # Commit changes on pressing Enter
                def commit_edit(event):
                    nonlocal current_values  # Declare as nonlocal to modify the outer variable
                    new_value = entry.get()
                    try:
                        # Attempt to convert to integer
                        int(new_value)
                        # Update the tree item
                        current_values = list(current_values)  # Now refers to the nonlocal variable
                        current_values[col_index] = new_value
                        self.tree.item(selected_item, values=current_values)
                    except ValueError:
                        messagebox.showerror("Invalid input", "Please enter a valid integer.")
                    finally:
                        entry.destroy()

                # Close entry on focus out
                def close_edit(event=None):
                    entry.destroy()

                entry.bind("<Return>", commit_edit)
                entry.bind("<FocusOut>", close_edit)

        self.tree.bind("<Double-1>", edit_cell)

    def connect_serial(self):
        """
        Initiate connection to the selected serial port.
        """
        port = self.selected_port.get()
        if not port:
            messagebox.showwarning("No Port Selected", "Please select a serial port to connect.")
            return

        if self.serial_thread and self.serial_thread.is_alive():
            messagebox.showinfo("Already Connected", f"Already connected to {port}.")
            return

        # Start the serial thread
        self.serial_thread = threading.Thread(target=self.serial_asyncio_loop, args=(port,), daemon=True)
        self.serial_thread.start()
        self.status_label.config(text=f"Connecting to {port}...", fg="yellow")
        print(f"Starting connection thread for {port}")

    def serial_asyncio_loop(self, port):
        """Run the asyncio event loop for serial communication."""
        self.loop = asyncio.new_event_loop()  # Create a new event loop
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.read_from_serial(port, 9600, self.queue))
        self.loop.run_forever()
        self.loop.close()

    async def parse_serial_data(self, data):
        """
        Parse key-value pairs from the serial data.

        Parameters:
            data (str): The raw data string from Arduino.

        Returns:
            dict: Parsed data with keys and integer values.
        """
        result = {}
        try:
            pairs = data.split(",")
            for pair in pairs:
                key, value = pair.split("=")
                # Remove any whitespace and convert to appropriate type
                key = key.strip().lower()
                value = value.strip()
                if key == "fuel":
                    result[key] = int(value)  # Assuming fuel is sent as an integer
                else:
                    result[key] = int(value)
        except ValueError:
            print(f"Invalid data format: {data}")
        return result

    async def read_from_serial(self, port, baudrate, data_queue):
        """
        Read data from the serial port asynchronously and put it into a queue.

        Parameters:
            port (str): The serial port to connect to.
            baudrate (int): The baud rate for serial communication.
            data_queue (queue.Queue): The queue to put the parsed data.
        """
        print(f"Attempting to open serial connection on {port} with baudrate {baudrate}")
        try:
            reader, writer = await serial_asyncio.open_serial_connection(url=port, baudrate=baudrate)
            self.writer = writer  # Store the writer for sending data
            print(f"Connected to {port}")
            self.root.after(0, lambda: self.status_label.config(text=f"Connected to {port}", fg="green"))

            # Wait briefly to allow Arduino to reset
            await asyncio.sleep(2)

            # Flush any initial data
            writer.write(b'\n')
            await writer.drain()

            while not self.stop_event.is_set():
                try:
                    line = await reader.readline()
                    if line:
                        data = line.decode().strip()
                        parsed_data = await self.parse_serial_data(data)
                        if "rpm" in parsed_data and "boost" in parsed_data and "fuel" in parsed_data:
                            rpm_value = parsed_data["rpm"]
                            boost_value = parsed_data["boost"]
                            fuel_value = parsed_data["fuel"]
                            print(f"Received: rpm={rpm_value}, boost={boost_value}, fuel={fuel_value}")
                            data_queue.put((rpm_value, boost_value, fuel_value))
                        else:
                            print(f"Missing keys or invalid data: {data}")
                except Exception as e:
                    print(f"Error reading serial data: {e}")
                await asyncio.sleep(0.1)  # Small delay to prevent overutilization
        except Exception as e:
            print(f"Failed to open serial connection: {e}")
            self.root.after(0, lambda: self.status_label.config(text=f"Connection Failed: {port}", fg="red"))
            messagebox.showerror("Connection Error", f"Failed to open serial connection on {port}.\nError: {e}")

    def process_serial_queue(self):
        """
        Process data from the serial queue and update the GUI.
        This method is called periodically using Tkinter's after() method.
        """
        try:
            while not self.queue.empty():
                rpm, boost, fuel = self.queue.get_nowait()
                self.update_gui_values(rpm, boost, fuel)
        except queue.Empty:
            pass
        self.root.after(100, self.process_serial_queue)

    def update_gui_values(self, rpm_value, boost_value, fuel_value):
        """
        Update the GUI values for RPM, Boost, and Fuel Percent.

        Parameters:
            rpm_value (int): The RPM value received from Arduino.
            boost_value (int): The Boost value received from Arduino.
            fuel_value (int): The Fuel Percent value received from Arduino.
        """
        self.rpm_entry.delete(0, tk.END)
        self.rpm_entry.insert(0, str(rpm_value))

        self.boost_entry.delete(0, tk.END)
        self.boost_entry.insert(0, str(boost_value))

        self.fuel_entry.delete(0, tk.END)
        self.fuel_entry.insert(0, f"{fuel_value}%")

    def save_data(self):
        """
        Save table data to a string and send it to the Arduino.
        """
        boost_values = []
        fuel_values = []
        for row in self.tree.get_children():
            values = self.tree.item(row)["values"]
            boost_values.append(int(values[0]))
            fuel_values.append(int(values[1]))

        print("Updated Boost Values:", boost_values)
        print("Updated Fuel Values:", fuel_values)
        named_arrays = {
            "boost": boost_values,
            "fuel": fuel_values
        }

        data_string = create_data_string(named_arrays)
        print(f"Data to send: {data_string}")

        # Send data asynchronously using the existing writer
        if self.writer:
            try:
                # Use the explicitly stored loop to schedule the write operation
                self.loop.call_soon_threadsafe(self.writer.write, data_string.encode('utf-8') + b'\n')
                asyncio.run_coroutine_threadsafe(self.writer.drain(), self.loop)
                print(f"Sent data: {data_string}")
                messagebox.showinfo("Data Sent", f"Data sent to ECU:\n{data_string}")
            except Exception as e:
                print(f"Failed to send data: {e}")
                messagebox.showerror("Send Error", f"Failed to send data to ECU.\nError: {e}")
        else:
            print("Cannot send data: Serial writer not available.")
            messagebox.showwarning("Send Data Failed", "Cannot send data: Serial connection not established.")

    def on_close(self):
        """Handle closing of the application."""
        print("Closing application...")
        self.stop_event.set()
        if self.serial_thread and self.serial_thread.is_alive():
            # Attempt to stop the asyncio loop
            try:
                asyncio.run_coroutine_threadsafe(self.shutdown_asyncio(), self.loop)
            except Exception as e:
                print(f"Error during shutdown: {e}")
        self.root.destroy()

    async def shutdown_asyncio(self):
        """Shutdown the asyncio event loop."""
        tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
        for task in tasks:
            task.cancel()
        await asyncio.gather(*tasks, return_exceptions=True)
        self.loop.stop()


# Create the app
if __name__ == "__main__":
    root = tk.Tk()
    app = ECUApp(root)
    root.mainloop()
Editor is loading...
Leave a Comment