Untitled
import asyncio import gc import logging import time from asyncio.streams import StreamReader, StreamWriter from typing import Final, Optional, Tuple, Union from pydantic import ValidationError from iso15118.secc.comm_session_handler import SECCCommunicationSession from iso15118.secc.controller.ev_data import EVSessionContext15118 from iso15118.secc.controller.interface import EVSEControllerInterface from iso15118.secc.secc_settings import Config from iso15118.shared.exceptions import ( EXIDecodingError, FaultyStateImplementationError, InvalidV2GTPMessageError, MessageProcessingError, V2GMessageValidationError, ) from iso15118.shared.exi_codec import EXI from iso15118.shared.messages.app_protocol import ( SupportedAppProtocolReq, SupportedAppProtocolRes, ) from iso15118.shared.messages.din_spec.msgdef import V2GMessage as V2GMessageDINSPEC from iso15118.shared.messages.enums import Protocol, SessionStopAction from iso15118.shared.messages.iso15118_2.msgdef import V2GMessage as V2GMessageV2 from iso15118.shared.messages.iso15118_20.common_types import ( V2GMessage as V2GMessageV20, ) from iso15118.shared.messages.v2gtp import V2GTPMessage from iso15118.shared.notifications import StopNotification from iso15118.shared.states import Pause, Terminate from core.communication_injector.abstract_communication_injector import ( AbstractCommunicationInjector, ) from core.communication_listener.abstract_hlc_session_listener import ( AbstracHlcSessionListener, ) from core.communication_listener.utils import process_message_to_notify _LOGGER = logging.getLogger(__name__) # The biggest message is the Certificate Installation Response, # which is estimated to be maximum between 5k to 6k # TODO check if that still holds with -20 (e.g. cross certs) _BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE: Final[int] = 7000 class HlcSession(SECCCommunicationSession): def __init__( self, transport: Tuple[StreamReader, StreamWriter], session_handler_queue: asyncio.Queue, config: Config, evse_controller: EVSEControllerInterface, evse_id: str, session_listener: AbstracHlcSessionListener, comm_injector: AbstractCommunicationInjector, ev_session_context: Optional[EVSessionContext15118], ): SECCCommunicationSession.__init__( self, transport, session_handler_queue, config, evse_controller, evse_id, ev_session_context, ) self.comm_injector: AbstractCommunicationInjector = comm_injector self.session_listener: AbstracHlcSessionListener = session_listener self.is_looping: bool = True self.rcvd_msg_time: Optional[float] = None self.session_listener.notify_hlc_state_change(str(self.current_state)) async def process_message(self, message: bytes): v2gtp_msg = await self._v2g_payload_extraction(message) decoded_message, decoded_msg_protocol = ( await self._get_decoded_message_and_protocol(v2gtp_msg) ) await self._notify_hlc_rcvd_message_and_current_state_process_msg( decoded_message, decoded_msg_protocol, v2gtp_msg ) async def stop(self, reason: str): if self.current_state.next_state == Pause: self.save_session_info() terminate_or_pause = SessionStopAction.PAUSE else: terminate_or_pause = SessionStopAction.TERMINATE _LOGGER.info( "The data link will %s in 2 seconds and the TCP connection will close in 5 seconds.", terminate_or_pause, ) _LOGGER.info("Reason: %s", reason) await asyncio.sleep(2) # Signal data link layer to either terminate or pause the data # link connection if hasattr(self.comm_session, "evse_controller"): evse_controller: EVSEControllerInterface = self.comm_session.evse_controller # type: ignore await evse_controller.update_data_link(terminate_or_pause) await evse_controller.session_ended(str(self.current_state), reason) elif hasattr(self.comm_session, "ev_controller"): # type: ignore[empty-body] await self.comm_session.ev_controller.enable_charging(False) # type: ignore _LOGGER.info("%sd the data link", terminate_or_pause) await asyncio.sleep(3) try: self.writer.close() await self.writer.wait_closed() except (asyncio.TimeoutError, ConnectionResetError) as exc: _LOGGER.info(str(exc)) # TOFIX: # We need to overhaul and unify the manner exceptions are handled here self.session_listener.notify_exception( exception_name=exc.__class__.__name__, exception_text=str(exc) ) _LOGGER.info("TCP connection closed to peer with address %s", self.peer_name) self.session_listener.notify_hlc_session_end() async def send(self, message: V2GTPMessage): prev_time = time.time_ns() injector_config = self.comm_injector.get_injected_response_message_config( str(self.current_state) ) if injector_config and injector_config.response_delay: delay = injector_config.response_delay await asyncio.sleep(delay) self.writer.write(message.to_bytes()) await self.writer.drain() sent_msg_time = (time.time_ns() - prev_time) * 1e-9 _LOGGER.info("Sent %s", str(self.current_state.message)) process_message_to_notify( protocol=message.protocol, message=self.current_state.message, time=sent_msg_time, timeout=-1.0, notify_callback=self.session_listener.notify_hlc_sent_message, ) '''async def rcv_loop(self, timeout: float): while self.is_looping: try: self.rcvd_msg_time = time.time_ns() message = await asyncio.wait_for( self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout ) self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9 if message == b"" and self.reader.at_eof(): stop_reason: str = "TCP peer closed connection" await self.stop(reason=stop_reason) self.session_handler_queue.put_nowait( StopNotification( False, stop_reason, self.peer_name, ) ) return except (asyncio.TimeoutError, ConnectionResetError) as exc: if isinstance(exc, asyncio.TimeoutError): if self.last_message_sent: error_msg = ( f"{exc.__class__.__name__} occurred. Waited " f"for {timeout} s after sending last message: " f"{str(self.last_message_sent)}" ) else: error_msg = ( f"{exc.__class__.__name__} occurred. Waited " f"for {timeout} s. No V2GTP message was " "previously sent. This is probably a timeout " f"while waiting for SupportedAppProtocolReq" ) else: error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}" # TOFIX: # We need to overhaul and unify the manner exceptions are handled here self.session_listener.notify_exception( exception_name=exc.__class__.__name__, exception_text=error_msg ) self.stop_reason = StopNotification(False, error_msg, self.peer_name) await self.stop(reason=error_msg) self.session_handler_queue.put_nowait(self.stop_reason) return gc_enabled = gc.isenabled() try: if gc_enabled: gc.disable() # This will create the values needed for the next state, such as # next_state, next_v2gtp_message, next_message_payload_type etc. await self.process_message(message) if self.current_state.next_v2gtp_msg: # next_v2gtp_msg would not be set only if the next state is either # Terminate or Pause on the EVCC side await self.send(self.current_state.next_v2gtp_msg) await self._update_state_info(self.current_state) self.session_listener.notify_hlc_state_change( str(self.current_state) ) if self.current_state.next_state in (Terminate, Pause): await self.stop(reason=self.comm_session.stop_reason.reason) self.comm_session.session_handler_queue.put_nowait( self.comm_session.stop_reason ) return timeout = self.current_state.next_msg_timeout self.go_to_next_state() except Exception as exc: # pylint: disable=broad-exception-caught # TOFIX: # We need to overhaul and unify the manner exceptions are handled here self.session_listener.notify_exception( exception_name=exc.__class__.__name__, exception_text=str(exc) ) message_name = "" additional_info = "" if isinstance(exc, MessageProcessingError): message_name = exc.message_name if isinstance(exc, FaultyStateImplementationError): additional_info = f": {exc}" if isinstance(exc, EXIDecodingError): additional_info = f": {exc}" if isinstance(exc, InvalidV2GTPMessageError): additional_info = f": {exc}" stop_reason = ( f"{exc.__class__.__name__} occurred while processing message " f"{message_name} in state {str(self.current_state)} : {exc}. " f"{additional_info}" ) self.stop_reason = StopNotification( False, stop_reason, self.peer_name, ) await self.stop(stop_reason) self.session_handler_queue.put_nowait(self.stop_reason) return finally: if gc_enabled: gc.enable()''' #start async def rcv_loop(self, timeout: float): while self.is_looping: try: self.rcvd_msg_time = time.time_ns() message = await asyncio.wait_for( self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout ) self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9 if message == b"" and self.reader.at_eof(): stop_reason = "TCP peer closed connection" await self._stop_tcp_connection_using_stop_notification(stop_reason) return except (asyncio.TimeoutError, ConnectionResetError) as exc: self.handle_stream_exception(exc, timeout) return gc_enabled = gc.isenabled() try: if gc_enabled: gc.disable() # This will create the values needed for the next state, such as # next_state, next_v2gtp_message, next_message_payload_type etc. await self._process_received_message(message) except Exception as exc: self._handle_processing_exception(exc) return finally: if gc_enabled: gc.enable() def handle_stream_exception(self, exc: Exception, timeout: float): if isinstance(exc, asyncio.TimeoutError): if self.last_message_sent: error_msg = ( f"{exc.__class__.__name__} occurred. Waited for {timeout} s after sending last message: " f"{str(self.last_message_sent)}" ) else: error_msg = ( f"{exc.__class__.__name__} occurred. Waited for {timeout} s. No V2GTP message was previously " "sent. This is probably a timeout while waiting for SupportedAppProtocolReq" ) else: error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}" self.session_listener.notify_exception( exception_name=exc.__class__.__name__, exception_text=error_msg ) self.stop_reason = StopNotification(False, error_msg, self.peer_name) self.session_handler_queue.put_nowait(self.stop_reason) async def _stop_tcp_connection_using_stop_notification(self, stop_reason: str): await self.stop(reason=stop_reason) self.session_handler_queue.put_nowait( StopNotification(False, stop_reason, self.peer_name) ) async def _process_received_message(self, message: bytes): # This will create the values needed for the next state, such as # next_state, next_v2gtp_message, next_message_payload_type etc. await self.process_message(message) if self.current_state.next_v2gtp_msg: # next_v2gtp_msg would not be set only if the next state is either # Terminate or Pause on the EVCC side await self.send(self.current_state.next_v2gtp_msg) await self._update_state_info(self.current_state) self.session_listener.notify_hlc_state_change(str(self.current_state)) if self.current_state.next_state in (Terminate, Pause): await self._stop_tcp_connection_using_stop_notification(self.comm_session.stop_reason.reason) else: self.go_to_next_state() # Use self.current_state.next_msg_timeout instead of hardcoded value # For simplicity, I am not changing timeout, it should be according to your use case self.timeout = self.current_state.next_msg_timeout def _handle_processing_exception(self, exc: Exception): message_name = "" additional_info = "" if isinstance(exc, MessageProcessingError): message_name = exc.message_name if isinstance(exc, FaultyStateImplementationError): additional_info = f": {exc}" if isinstance(exc, EXIDecodingError): additional_info = f": {exc}" if isinstance(exc, InvalidV2GTPMessageError): additional_info = f": {exc}" stop_reason = ( f"{exc.__class__.__name__} occurred while processing message " f"{message_name} in state {str(self.current_state)} : {exc}. " f"{additional_info}" ) self.stop_reason = StopNotification(False, stop_reason, self.peer_name) self.session_handler_queue.put_nowait(self.stop_reason) #end def _notify_listener_with_ev_data(self): self.session_listener.notify_target_current( self.evse_controller.ev_data_context.target_current ) self.session_listener.notify_target_voltage( self.evse_controller.ev_data_context.target_voltage ) self.session_listener.notify_complete_charging_status( bool(self.evse_controller.ev_data_context.present_soc == 100) ) self.session_listener.notify_max_current( self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_current ) self.session_listener.notify_max_power( self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_power ) self.session_listener.notify_max_voltage( self.evse_controller.ev_data_context.session_limits.dc_limits.max_voltage ) self.session_listener.notify_present_current( self.evse_controller.evse_data_context.present_current ) self.session_listener.notify_present_voltage( self.evse_controller.ev_data_context.present_voltage ) # soc: state of charge self.session_listener.notify_remaining_time_to_full_soc( self.evse_controller.ev_data_context.remaining_time_to_target_soc ) async def _v2g_payload_extraction(self, message: bytes): # Step 1 try: # First extract the V2GMessage payload from the V2GTPMessage ... v2gtp_msg = V2GTPMessage.from_bytes(self.comm_session.protocol, message) except InvalidV2GTPMessageError as exc: _LOGGER.exception("Incoming TCPPacket is not a valid V2GTPMessage") raise exc self._notify_listener_with_ev_data() return v2gtp_msg async def _get_decoded_message_and_protocol(self, v2gtp_msg: V2GTPMessage): # Step 2 decoded_message: Union[ SupportedAppProtocolReq, SupportedAppProtocolRes, V2GMessageV2, V2GMessageV20, V2GMessageDINSPEC, None, ] = None decoded_msg_protocol: Optional[Protocol] = None # Second decode the bytearray into the message try: decoded_message = EXI().from_exi( v2gtp_msg.payload, self.get_exi_ns(v2gtp_msg.payload_type) ) decoded_msg_protocol = v2gtp_msg.protocol if hasattr(self.comm_session, "evse_id"): _LOGGER.trace( # type: ignore[attr-defined] f"{self.comm_session.evse_id}:::" f"{v2gtp_msg.payload.hex()}:::" f"{self.get_exi_ns(v2gtp_msg.payload_type).value}" ) except V2GMessageValidationError as exc: _LOGGER.error( "EXI message (ns=%s) where validation failed: %s", self.get_exi_ns(v2gtp_msg.payload_type), v2gtp_msg.payload.hex(), ) raise exc except EXIDecodingError as exc: _LOGGER.exception("%s", exc) _LOGGER.error( "EXI message (ns=%s) where error occured: %s", self.get_exi_ns(v2gtp_msg.payload_type), v2gtp_msg.payload.hex(), ) raise exc # Shouldn't happen, but just to be sure (otherwise mypy would complain) if not decoded_message: _LOGGER.error( "Unusual error situation: decoded_message is None" "although no EXIDecodingError was raised" ) self._notify_listener_with_ev_data() return decoded_message, decoded_msg_protocol async def _notify_hlc_rcvd_message_and_current_state_process_msg( self, decoded_message, decoded_msg_protocol: Protocol, v2gtp_msg: V2GTPMessage ): # Step 3 # Third notify hlc with recieved message and process the message with the current state try: _LOGGER.info("%s received", str(decoded_message)) process_message_to_notify( protocol=decoded_msg_protocol, message=decoded_message, time=self.rcvd_msg_time, timeout=-1.0, notify_callback=self.session_listener.notify_hlc_received_message, ) self.current_state.injector_callback = ( self.comm_injector.inject_data_to_response_message ) await self.current_state.process_message(decoded_message, v2gtp_msg.payload) except MessageProcessingError as exc: _LOGGER.exception( "%s while processing %s", exc.__class__.__name__, exc.message_name ) raise exc except FaultyStateImplementationError as exc: _LOGGER.exception("%s: %s", exc.__class__.__name__, exc) raise exc except ValidationError as exc: _LOGGER.exception("%s: %s", exc.__class__.__name__, exc.raw_errors) raise exc except AttributeError as exc: _LOGGER.exception("%s", exc) raise exc if ( self.current_state.next_v2gtp_msg is None and self.current_state.next_state is not Terminate ): raise FaultyStateImplementationError( "Field 'next_v2gtp_msg' is " "None but must be set because " "next state is not Terminate" ) self._notify_listener_with_ev_data() return
Leave a Comment