Serial Code 3K

 avatar
unknown
plain_text
3 years ago
14 kB
6
Indexable
from enum import Enum, unique
from struct import calcsize, pack, unpack, unpack_from
from threading import Lock, Timer
from time import sleep
from typing import Dict, List, Optional, Union

from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QMessageBox
from serial import Serial, SerialException
from serial.tools import list_ports
from serial.tools.list_ports_common import ListPortInfo


# Enum for pacemaker connection states
@unique
class PacemakerState(Enum):
    NOT_CONNECTED = 1
    CONNECTED = 2
    REGISTERED = 3


# This class is a private class that handles the serial communication with the pacemaker
# https://github.com/pyserial/pyserial/issues/216#issuecomment-369414522
class _SerialHandler(QThread):
    _running: bool
    _buf: bytearray
    _conn: Serial
    _num_bytes_to_read: int
    _sent_data: bytes
    _send_params: bool
    _lock: Lock

    # A signal that's emitted every time we receive ECG data
    ecg_data_update: pyqtSignal = pyqtSignal(tuple, tuple)

    # A signal that's emitted upon param verification with the pacemaker
    params_received: pyqtSignal = pyqtSignal(bool, str)

    # https://docs.python.org/3.7/library/struct.html#format-strings
    _num_floats = 20
    _PARAMS_FMT_STR, _ECG_FMT_STR, _ECG_DATA_STR = "=6BH3BHH3BHBB", f"={_num_floats}f", f"={_num_floats // 2}f"
    _PARAMS_NUM_BYTES, _ECG_NUM_BYTES, _ECG_DATA = calcsize(_PARAMS_FMT_STR), calcsize(_ECG_FMT_STR), calcsize(
        _ECG_DATA_STR)
    _REQUEST_ECG = pack("=B33x", 0x55)
    # _PARAMS_ORDER = ["Pacing Mode", "Lower Rate Limit", "Upper Rate Limit", "Atrial Amplitude", "Atrial Pulse Width",
    #                  "Atrial Sensitivity", "Ventricular Amplitude", "Ventricular Pulse Width",
    #                  "Ventricular Sensitivity", "VRP", "ARP", "PVARP", "Fixed AV Delay", "Maximum Sensor Rate",
    #                  "Reaction Time", "Response Factor", "Recovery Time", "Activity Threshold"]
    _PARAMS_ORDER = ["Lower Rate Limit", "Atrial Amplitude", "Atrial Pulse Width",
                     "Atrial Sensitivity", "ARP", "Ventricular Amplitude", "Ventricular Pulse Width",
                     "Ventricular Sensitivity", "VRP", "Fixed AV Delay", "Activity Threshold",
                     "Reaction Time", "Response Factor", "Recovery Time", "Maximum Sensor Rate", "Pacing Mode"]

    def __init__(self):
        super().__init__()
        print("Serial handler init")

        self._running = False
        self._buf = bytearray()
        self._conn = Serial(baudrate=115200, timeout=10)
        self._num_bytes_to_read = self._ECG_NUM_BYTES + 1
        self._sent_data = bytes()
        self._send_params = False
        self._req_ecg = False
        self.atr = False
        self.vent = False
        self._lock = Lock()  # lock is used to prevent concurrent accessing/modification of variables

        
    # Gets called when the thread starts, overrides method in QThread
    def run(self):
        self._running = True

        while self._running:
            # Check if the serial connection with the pacemaker is open
            if self._conn.is_open:
                try:
                    with self._lock:
                        if self._send_params:  # if we want to send params
                            self._send_params = False
                            self._conn.write(self._sent_data)
                        elif self._req_ecg:
                            self._req_ecg = False
                            Timer(0.05, self.print_data).start()
                except Exception as e:
                    print(e)
                    pass
            elif self._conn.port:
                self._try_to_open_port()
            else:
                sleep(1)

    def print_data(self) -> None:
        req_array = [12, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
        req_bytes = pack("=6BH3BHH3BHBB", *req_array)
        self._conn.write(req_bytes)
        returned = unpack("=HH4BH3BHH3BHBB", self._conn.read(24))
        atr_val = returned[0]/10000 if self.atr else 0
        vent_val = returned[1]/10000 if self.vent else 0
        print(atr_val, vent_val)
        self._req_ecg = self.atr or self.vent
        self.ecg_data_update.emit((atr_val,), (vent_val,))

    # Update the parameters to send to the pacemaker, and enable the send flag
    def send_params_to_pacemaker(self, params_to_send: Dict[str, Union[int, float]]) -> None:
        with self._lock:
            paramarray = [12, 5, *[int(params_to_send[key]) for key in self._PARAMS_ORDER]]
            print('Sending Data:')
            print(paramarray)
            self._conn.write(pack("=6BH3BHH3BHBB", *paramarray))

    # Read the output stream of the pacemaker
    def _readline(self) -> bytearray:
        buf_len: int = len(self._buf)

        # If buffer already contains more than num_bytes_to_read bytes, remove and return the oldest ones from buffer
        if buf_len >= self._num_bytes_to_read:
            r = self._buf[:self._num_bytes_to_read]
            self._buf = self._buf[self._num_bytes_to_read:]
            return r

        # Read serial data and store in buffer until we have num bytes to read bytes, then remove and return those
        while self._running and self._conn.is_open:
            data: Optional[bytes] = self._conn.read(self._num_bytes_to_read)
            buf_len = len(self._buf)

            if buf_len >= self._num_bytes_to_read:
                r = self._buf[:self._num_bytes_to_read]
                self._buf = self._buf[self._num_bytes_to_read:] + data
                return r
            else:
                self._buf.extend(data)

    # Attempt to open serial port with pacemaker
    def _try_to_open_port(self) -> None:
        with self._lock:
            try:
                self._conn.open()
                print("Opened Port")
            except SerialException:
                pass

    # Verify that the params sent to the pacemaker are the ones received
    def _verify_params(self, received_params: bytes) -> None:
        if self._sent_data != bytes(received_params[:self._PARAMS_NUM_BYTES]):
            self.params_received.emit(False, "The received parameters were not the same as the sent ones!\nPlease "
                                             "restart the DCM/Pacemaker or try a different Pacemaker!")
        else:
            self.params_received.emit(True, "Successfully sent parameters!")

    # Stops the thread
    def stop(self) -> None:
        with self._lock:
            self._running = False
            self._conn.close()

    # Set the serial connection port to that of the pacemaker, and clear the buffer
    def start_serial_comm(self, port: str) -> None:
        print(f"Opening serial port {port} with pacemaker")
        self._buf = bytearray()
        with self._lock:
            self._conn.port = port

    # Safely close the serial connection and clear the port
    def stop_serial_comm(self) -> None:
        with self._lock:
            self._conn.close()
            self._conn.port = None


# This class handles the pacemaker connection for the DCM and extends the QThread class to allow for multithreading
class ConnectionHandler(QThread):
    _running: bool
    _device: ListPortInfo
    _devices: List[ListPortInfo]
    _old_devices: List[ListPortInfo]
    _first_serial_num: str
    _current_state: PacemakerState
    _prev_state: PacemakerState
    _wanted_state: PacemakerState
    serial: _SerialHandler

    # A signal that's emitted every time we change state
    connect_status_change: pyqtSignal = pyqtSignal(PacemakerState, str)  # the str is the serial_num and/or a msg

    def __init__(self):
        super().__init__()
        print("Connection handler init")

        self._running = False

        # self._device = ListPortInfo()
        self._devices = self._old_devices = []

        self._first_serial_num = ""

        self._current_state = self._prev_state = self._wanted_state = PacemakerState.NOT_CONNECTED

        # Initialize and start the serial connection handler
        self.serial = _SerialHandler()
        self.serial.start()

    # Gets called when the thread starts, overrides method in QThread
    def run(self):
        self._running = True
        self.connect_status_change.emit(PacemakerState.NOT_CONNECTED, "")

        self._update_state()

        while self._running:
            self._update_state()
            sleep(0.01)

    # Stops the thread and stops the serial conn thread
    def stop(self) -> None:
        self._running = False
        self.serial.stop()

    # State machine for pacemaker connection state. It was implemented like this because it offers us many benefits
    # such as cleaner, easier to read code, ensuring that a pacemaker gets registered only once, handling multiple
    # pacemakers being plugged into the same computer, and handling the New Patient btn presses in a much simpler way.
    def _update_state(self) -> None:
        # Get list of connected COM port devices
        self._devices = self._filter_devices(list_ports.comports())

        added = [dev for dev in self._devices if dev not in self._old_devices]  # difference between new and old
        removed = [dev for dev in self._old_devices if dev not in self._devices]  # difference between old and new

        # Update the current state if its not aligned with the state we want to be in
        if self._current_state != self._wanted_state:
            self._current_state = self._wanted_state

        # We're not connected to any pacemaker
        if self._current_state == PacemakerState.NOT_CONNECTED:
            if len(added) > 0:  # if there is a new device added
                self._device = added[0]

                if self._first_serial_num == "":  # if this is the first device connected, auto-register
                    self._first_serial_num = self._device.serial_number
                    self._wanted_state = PacemakerState.REGISTERED
                elif self._first_serial_num == self._device.serial_number:  # if the first device was replugged in
                    self._wanted_state = PacemakerState.REGISTERED
                else:  # another device is plugged in
                    self._wanted_state = PacemakerState.CONNECTED

        # We're connected to an unregistered pacemaker
        elif self._current_state == PacemakerState.CONNECTED:
            # The only way to go from CONNECTED to REGISTERED is if the New Patient btn is pressed
            if self._prev_state == PacemakerState.NOT_CONNECTED:
                self.connect_status_change.emit(PacemakerState.CONNECTED, f"{self._device.serial_number}, press New "
                                                                          f"Patient to register")
            # Handle a device being removed
            self._handle_removed_device(removed)

        # We're connected to a registered pacemaker
        elif self._current_state == PacemakerState.REGISTERED:
            # If we've just transitioned to REGISTERED, open the serial communication link
            if self._prev_state == PacemakerState.NOT_CONNECTED or self._prev_state == PacemakerState.CONNECTED:
                self.serial.start_serial_comm(self._device.device)
                self.connect_status_change.emit(PacemakerState.REGISTERED, self._device.serial_number)

            # Handle a device being removed
            self._handle_removed_device(removed)

        # Update variables that store previous cycle information
        self._old_devices = self._devices
        self._prev_state = self._current_state

    # Called when the New Patient button is pressed
    def register_device(self) -> None:
        if self._current_state == PacemakerState.CONNECTED:
            self._wanted_state = PacemakerState.REGISTERED
        elif self._device.serial_number:  # at this point, we've already registered the device
            self._show_alert("Already registered this pacemaker!")
        elif len(self._devices) > 0:  # we only connect to 1 device at a time, so the rest are ignored
            self._show_alert("Please unplug and replug the pacemaker you want to connect to!")
        else:
            self._show_alert("Please plug in a pacemaker!")

    # Handles the transition to NOT_CONNECTED if the pacemaker we're connected to is unplugged
    def _handle_removed_device(self, removed: List[ListPortInfo]) -> None:
        if any(self._device.serial_number == dev.serial_number for dev in removed):
            self._wanted_state = PacemakerState.NOT_CONNECTED
            self.connect_status_change.emit(PacemakerState.NOT_CONNECTED, removed[0].serial_number)
            print('Lost Connection, Stopping Port')
            self.serial.stop_serial_comm()

    # Called when the Pace Now button is pressed
    def send_data_to_pacemaker(self, params: Dict[str, Union[int, float]]) -> None:
        if self._current_state == PacemakerState.REGISTERED:
            self.serial.send_params_to_pacemaker(params)
        elif self._current_state == PacemakerState.CONNECTED:
            self._show_alert("Please register the pacemaker first!")
        else:
            self._show_alert("Please plug in a pacemaker!")

    @staticmethod
    def _show_alert(msg: str) -> None:
        """
        Displays an information message with the specified text

        :param msg: the text to show
        """
        qm = QMessageBox()
        QMessageBox.information(qm, "Connection", msg, QMessageBox.Ok, QMessageBox.Ok)

    @staticmethod
    def _filter_devices(data: List[ListPortInfo]) -> List[ListPortInfo]:
        """
        Filter plugged in COM port devices so that we only connect to pacemaker devices
        The SEGGER devices have a Vendor ID of 0x1366 and Product ID of 0x1015

        :param data: list of all plugged in COM port devices
        :return: list of all plugged in pacemaker devices
        """
        return [dev for dev in data if dev.vid == 0x1366 and dev.pid == 0x1015]

    def printgraph(self, atr: bool, vent: bool) -> None:
        self.serial._req_ecg = atr or vent
        self.serial.atr = atr
        self.serial.vent = vent
Editor is loading...