Serial Code 3K
unknown
plain_text
3 years ago
14 kB
7
Indexable
from enum import Enum, unique
from struct import calcsize, pack, unpack, unpack_from
from threading import Lock, Timer
from time import sleep
from typing import Dict, List, Optional, Union
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QMessageBox
from serial import Serial, SerialException
from serial.tools import list_ports
from serial.tools.list_ports_common import ListPortInfo
# Enum for pacemaker connection states
@unique
class PacemakerState(Enum):
NOT_CONNECTED = 1
CONNECTED = 2
REGISTERED = 3
# This class is a private class that handles the serial communication with the pacemaker
# https://github.com/pyserial/pyserial/issues/216#issuecomment-369414522
class _SerialHandler(QThread):
_running: bool
_buf: bytearray
_conn: Serial
_num_bytes_to_read: int
_sent_data: bytes
_send_params: bool
_lock: Lock
# A signal that's emitted every time we receive ECG data
ecg_data_update: pyqtSignal = pyqtSignal(tuple, tuple)
# A signal that's emitted upon param verification with the pacemaker
params_received: pyqtSignal = pyqtSignal(bool, str)
# https://docs.python.org/3.7/library/struct.html#format-strings
_num_floats = 20
_PARAMS_FMT_STR, _ECG_FMT_STR, _ECG_DATA_STR = "=6BH3BHH3BHBB", f"={_num_floats}f", f"={_num_floats // 2}f"
_PARAMS_NUM_BYTES, _ECG_NUM_BYTES, _ECG_DATA = calcsize(_PARAMS_FMT_STR), calcsize(_ECG_FMT_STR), calcsize(
_ECG_DATA_STR)
_REQUEST_ECG = pack("=B33x", 0x55)
# _PARAMS_ORDER = ["Pacing Mode", "Lower Rate Limit", "Upper Rate Limit", "Atrial Amplitude", "Atrial Pulse Width",
# "Atrial Sensitivity", "Ventricular Amplitude", "Ventricular Pulse Width",
# "Ventricular Sensitivity", "VRP", "ARP", "PVARP", "Fixed AV Delay", "Maximum Sensor Rate",
# "Reaction Time", "Response Factor", "Recovery Time", "Activity Threshold"]
_PARAMS_ORDER = ["Lower Rate Limit", "Atrial Amplitude", "Atrial Pulse Width",
"Atrial Sensitivity", "ARP", "Ventricular Amplitude", "Ventricular Pulse Width",
"Ventricular Sensitivity", "VRP", "Fixed AV Delay", "Activity Threshold",
"Reaction Time", "Response Factor", "Recovery Time", "Maximum Sensor Rate", "Pacing Mode"]
def __init__(self):
super().__init__()
print("Serial handler init")
self._running = False
self._buf = bytearray()
self._conn = Serial(baudrate=115200, timeout=10)
self._num_bytes_to_read = self._ECG_NUM_BYTES + 1
self._sent_data = bytes()
self._send_params = False
self._req_ecg = False
self.atr = False
self.vent = False
self._lock = Lock() # lock is used to prevent concurrent accessing/modification of variables
# Gets called when the thread starts, overrides method in QThread
def run(self):
self._running = True
while self._running:
# Check if the serial connection with the pacemaker is open
if self._conn.is_open:
try:
with self._lock:
if self._send_params: # if we want to send params
self._send_params = False
self._conn.write(self._sent_data)
elif self._req_ecg:
self._req_ecg = False
Timer(0.05, self.print_data).start()
except Exception as e:
print(e)
pass
elif self._conn.port:
self._try_to_open_port()
else:
sleep(1)
def print_data(self) -> None:
req_array = [12, 15, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
req_bytes = pack("=6BH3BHH3BHBB", *req_array)
self._conn.write(req_bytes)
returned = unpack("=HH4BH3BHH3BHBB", self._conn.read(24))
atr_val = returned[0]/10000 if self.atr else 0
vent_val = returned[1]/10000 if self.vent else 0
print(atr_val, vent_val)
self._req_ecg = self.atr or self.vent
self.ecg_data_update.emit((atr_val,), (vent_val,))
# Update the parameters to send to the pacemaker, and enable the send flag
def send_params_to_pacemaker(self, params_to_send: Dict[str, Union[int, float]]) -> None:
with self._lock:
paramarray = [12, 5, *[int(params_to_send[key]) for key in self._PARAMS_ORDER]]
print('Sending Data:')
print(paramarray)
self._conn.write(pack("=6BH3BHH3BHBB", *paramarray))
# Read the output stream of the pacemaker
def _readline(self) -> bytearray:
buf_len: int = len(self._buf)
# If buffer already contains more than num_bytes_to_read bytes, remove and return the oldest ones from buffer
if buf_len >= self._num_bytes_to_read:
r = self._buf[:self._num_bytes_to_read]
self._buf = self._buf[self._num_bytes_to_read:]
return r
# Read serial data and store in buffer until we have num bytes to read bytes, then remove and return those
while self._running and self._conn.is_open:
data: Optional[bytes] = self._conn.read(self._num_bytes_to_read)
buf_len = len(self._buf)
if buf_len >= self._num_bytes_to_read:
r = self._buf[:self._num_bytes_to_read]
self._buf = self._buf[self._num_bytes_to_read:] + data
return r
else:
self._buf.extend(data)
# Attempt to open serial port with pacemaker
def _try_to_open_port(self) -> None:
with self._lock:
try:
self._conn.open()
print("Opened Port")
except SerialException:
pass
# Verify that the params sent to the pacemaker are the ones received
def _verify_params(self, received_params: bytes) -> None:
if self._sent_data != bytes(received_params[:self._PARAMS_NUM_BYTES]):
self.params_received.emit(False, "The received parameters were not the same as the sent ones!\nPlease "
"restart the DCM/Pacemaker or try a different Pacemaker!")
else:
self.params_received.emit(True, "Successfully sent parameters!")
# Stops the thread
def stop(self) -> None:
with self._lock:
self._running = False
self._conn.close()
# Set the serial connection port to that of the pacemaker, and clear the buffer
def start_serial_comm(self, port: str) -> None:
print(f"Opening serial port {port} with pacemaker")
self._buf = bytearray()
with self._lock:
self._conn.port = port
# Safely close the serial connection and clear the port
def stop_serial_comm(self) -> None:
with self._lock:
self._conn.close()
self._conn.port = None
# This class handles the pacemaker connection for the DCM and extends the QThread class to allow for multithreading
class ConnectionHandler(QThread):
_running: bool
_device: ListPortInfo
_devices: List[ListPortInfo]
_old_devices: List[ListPortInfo]
_first_serial_num: str
_current_state: PacemakerState
_prev_state: PacemakerState
_wanted_state: PacemakerState
serial: _SerialHandler
# A signal that's emitted every time we change state
connect_status_change: pyqtSignal = pyqtSignal(PacemakerState, str) # the str is the serial_num and/or a msg
def __init__(self):
super().__init__()
print("Connection handler init")
self._running = False
# self._device = ListPortInfo()
self._devices = self._old_devices = []
self._first_serial_num = ""
self._current_state = self._prev_state = self._wanted_state = PacemakerState.NOT_CONNECTED
# Initialize and start the serial connection handler
self.serial = _SerialHandler()
self.serial.start()
# Gets called when the thread starts, overrides method in QThread
def run(self):
self._running = True
self.connect_status_change.emit(PacemakerState.NOT_CONNECTED, "")
self._update_state()
while self._running:
self._update_state()
sleep(0.01)
# Stops the thread and stops the serial conn thread
def stop(self) -> None:
self._running = False
self.serial.stop()
# State machine for pacemaker connection state. It was implemented like this because it offers us many benefits
# such as cleaner, easier to read code, ensuring that a pacemaker gets registered only once, handling multiple
# pacemakers being plugged into the same computer, and handling the New Patient btn presses in a much simpler way.
def _update_state(self) -> None:
# Get list of connected COM port devices
self._devices = self._filter_devices(list_ports.comports())
added = [dev for dev in self._devices if dev not in self._old_devices] # difference between new and old
removed = [dev for dev in self._old_devices if dev not in self._devices] # difference between old and new
# Update the current state if its not aligned with the state we want to be in
if self._current_state != self._wanted_state:
self._current_state = self._wanted_state
# We're not connected to any pacemaker
if self._current_state == PacemakerState.NOT_CONNECTED:
if len(added) > 0: # if there is a new device added
self._device = added[0]
if self._first_serial_num == "": # if this is the first device connected, auto-register
self._first_serial_num = self._device.serial_number
self._wanted_state = PacemakerState.REGISTERED
elif self._first_serial_num == self._device.serial_number: # if the first device was replugged in
self._wanted_state = PacemakerState.REGISTERED
else: # another device is plugged in
self._wanted_state = PacemakerState.CONNECTED
# We're connected to an unregistered pacemaker
elif self._current_state == PacemakerState.CONNECTED:
# The only way to go from CONNECTED to REGISTERED is if the New Patient btn is pressed
if self._prev_state == PacemakerState.NOT_CONNECTED:
self.connect_status_change.emit(PacemakerState.CONNECTED, f"{self._device.serial_number}, press New "
f"Patient to register")
# Handle a device being removed
self._handle_removed_device(removed)
# We're connected to a registered pacemaker
elif self._current_state == PacemakerState.REGISTERED:
# If we've just transitioned to REGISTERED, open the serial communication link
if self._prev_state == PacemakerState.NOT_CONNECTED or self._prev_state == PacemakerState.CONNECTED:
self.serial.start_serial_comm(self._device.device)
self.connect_status_change.emit(PacemakerState.REGISTERED, self._device.serial_number)
# Handle a device being removed
self._handle_removed_device(removed)
# Update variables that store previous cycle information
self._old_devices = self._devices
self._prev_state = self._current_state
# Called when the New Patient button is pressed
def register_device(self) -> None:
if self._current_state == PacemakerState.CONNECTED:
self._wanted_state = PacemakerState.REGISTERED
elif self._device.serial_number: # at this point, we've already registered the device
self._show_alert("Already registered this pacemaker!")
elif len(self._devices) > 0: # we only connect to 1 device at a time, so the rest are ignored
self._show_alert("Please unplug and replug the pacemaker you want to connect to!")
else:
self._show_alert("Please plug in a pacemaker!")
# Handles the transition to NOT_CONNECTED if the pacemaker we're connected to is unplugged
def _handle_removed_device(self, removed: List[ListPortInfo]) -> None:
if any(self._device.serial_number == dev.serial_number for dev in removed):
self._wanted_state = PacemakerState.NOT_CONNECTED
self.connect_status_change.emit(PacemakerState.NOT_CONNECTED, removed[0].serial_number)
print('Lost Connection, Stopping Port')
self.serial.stop_serial_comm()
# Called when the Pace Now button is pressed
def send_data_to_pacemaker(self, params: Dict[str, Union[int, float]]) -> None:
if self._current_state == PacemakerState.REGISTERED:
self.serial.send_params_to_pacemaker(params)
elif self._current_state == PacemakerState.CONNECTED:
self._show_alert("Please register the pacemaker first!")
else:
self._show_alert("Please plug in a pacemaker!")
@staticmethod
def _show_alert(msg: str) -> None:
"""
Displays an information message with the specified text
:param msg: the text to show
"""
qm = QMessageBox()
QMessageBox.information(qm, "Connection", msg, QMessageBox.Ok, QMessageBox.Ok)
@staticmethod
def _filter_devices(data: List[ListPortInfo]) -> List[ListPortInfo]:
"""
Filter plugged in COM port devices so that we only connect to pacemaker devices
The SEGGER devices have a Vendor ID of 0x1366 and Product ID of 0x1015
:param data: list of all plugged in COM port devices
:return: list of all plugged in pacemaker devices
"""
return [dev for dev in data if dev.vid == 0x1366 and dev.pid == 0x1015]
def printgraph(self, atr: bool, vent: bool) -> None:
self.serial._req_ecg = atr or vent
self.serial.atr = atr
self.serial.vent = ventEditor is loading...