Untitled

 avatar
unknown
plain_text
4 days ago
22 kB
4
Indexable
import asyncio
import gc
import logging
import time
from asyncio.streams import StreamReader, StreamWriter
from typing import Final, Optional, Tuple, Union
from pydantic import ValidationError

from iso15118.secc.comm_session_handler import SECCCommunicationSession
from iso15118.secc.controller.ev_data import EVSessionContext15118
from iso15118.secc.controller.interface import EVSEControllerInterface
from iso15118.secc.secc_settings import Config
from iso15118.shared.exceptions import (
    EXIDecodingError,
    FaultyStateImplementationError,
    InvalidV2GTPMessageError,
    MessageProcessingError,
    V2GMessageValidationError,
)
from iso15118.shared.exi_codec import EXI
from iso15118.shared.messages.app_protocol import (
    SupportedAppProtocolReq,
    SupportedAppProtocolRes,
)
from iso15118.shared.messages.din_spec.msgdef import V2GMessage as V2GMessageDINSPEC
from iso15118.shared.messages.enums import Protocol, SessionStopAction
from iso15118.shared.messages.iso15118_2.msgdef import V2GMessage as V2GMessageV2
from iso15118.shared.messages.iso15118_20.common_types import (
    V2GMessage as V2GMessageV20,
)
from iso15118.shared.messages.v2gtp import V2GTPMessage
from iso15118.shared.notifications import StopNotification
from iso15118.shared.states import Pause, Terminate

from core.communication_injector.abstract_communication_injector import (
    AbstractCommunicationInjector,
)
from core.communication_listener.abstract_hlc_session_listener import (
    AbstracHlcSessionListener,
)
from core.communication_listener.utils import process_message_to_notify

_LOGGER = logging.getLogger(__name__)
# The biggest message is the Certificate Installation Response,
# which is estimated to be maximum between 5k to 6k
# TODO check if that still holds with -20 (e.g. cross certs)
_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE: Final[int] = 7000


class HlcSession(SECCCommunicationSession):
    def __init__(
        self,
        transport: Tuple[StreamReader, StreamWriter],
        session_handler_queue: asyncio.Queue,
        config: Config,
        evse_controller: EVSEControllerInterface,
        evse_id: str,
        session_listener: AbstracHlcSessionListener,
        comm_injector: AbstractCommunicationInjector,
        ev_session_context: Optional[EVSessionContext15118],
    ):
        SECCCommunicationSession.__init__(
            self,
            transport,
            session_handler_queue,
            config,
            evse_controller,
            evse_id,
            ev_session_context,
        )
        self.comm_injector: AbstractCommunicationInjector = comm_injector
        self.session_listener: AbstracHlcSessionListener = session_listener
        self.is_looping: bool = True
        self.rcvd_msg_time: Optional[float] = None

        self.session_listener.notify_hlc_state_change(str(self.current_state))

    async def process_message(self, message: bytes):

        v2gtp_msg = await self._v2g_payload_extraction(message)

        decoded_message, decoded_msg_protocol = (
            await self._get_decoded_message_and_protocol(v2gtp_msg)
        )

        await self._notify_hlc_rcvd_message_and_current_state_process_msg(
            decoded_message, decoded_msg_protocol, v2gtp_msg
        )

    async def stop(self, reason: str):
        if self.current_state.next_state == Pause:
            self.save_session_info()
            terminate_or_pause = SessionStopAction.PAUSE
        else:
            terminate_or_pause = SessionStopAction.TERMINATE

        _LOGGER.info(
            "The data link will %s in 2 seconds and the TCP connection will close in 5 seconds.",
            terminate_or_pause,
        )
        _LOGGER.info("Reason: %s", reason)

        await asyncio.sleep(2)
        # Signal data link layer to either terminate or pause the data
        # link connection
        if hasattr(self.comm_session, "evse_controller"):
            evse_controller: EVSEControllerInterface = self.comm_session.evse_controller  # type: ignore
            await evse_controller.update_data_link(terminate_or_pause)
            await evse_controller.session_ended(str(self.current_state), reason)
        elif hasattr(self.comm_session, "ev_controller"):
            # type: ignore[empty-body]
            await self.comm_session.ev_controller.enable_charging(False)  # type: ignore
        _LOGGER.info("%sd the data link", terminate_or_pause)
        await asyncio.sleep(3)
        try:
            self.writer.close()
            await self.writer.wait_closed()
        except (asyncio.TimeoutError, ConnectionResetError) as exc:
            _LOGGER.info(str(exc))

            # TOFIX:
            # We need to overhaul and unify the manner exceptions are handled here
            self.session_listener.notify_exception(
                exception_name=exc.__class__.__name__, exception_text=str(exc)
            )
        _LOGGER.info("TCP connection closed to peer with address %s", self.peer_name)

        self.session_listener.notify_hlc_session_end()

    async def send(self, message: V2GTPMessage):
        prev_time = time.time_ns()
        injector_config = self.comm_injector.get_injected_response_message_config(
            str(self.current_state)
        )
        if injector_config and injector_config.response_delay:
            delay = injector_config.response_delay
            await asyncio.sleep(delay)
        self.writer.write(message.to_bytes())
        await self.writer.drain()
        sent_msg_time = (time.time_ns() - prev_time) * 1e-9
        _LOGGER.info("Sent %s", str(self.current_state.message))
        process_message_to_notify(
            protocol=message.protocol,
            message=self.current_state.message,
            time=sent_msg_time,
            timeout=-1.0,
            notify_callback=self.session_listener.notify_hlc_sent_message,
        )

    '''async def rcv_loop(self, timeout: float):
        while self.is_looping:
            try:
                self.rcvd_msg_time = time.time_ns()
                message = await asyncio.wait_for(
                    self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout
                )
                self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9
                if message == b"" and self.reader.at_eof():
                    stop_reason: str = "TCP peer closed connection"
                    await self.stop(reason=stop_reason)
                    self.session_handler_queue.put_nowait(
                        StopNotification(
                            False,
                            stop_reason,
                            self.peer_name,
                        )
                    )
                    return
            except (asyncio.TimeoutError, ConnectionResetError) as exc:
                if isinstance(exc, asyncio.TimeoutError):
                    if self.last_message_sent:
                        error_msg = (
                            f"{exc.__class__.__name__} occurred. Waited "
                            f"for {timeout} s after sending last message: "
                            f"{str(self.last_message_sent)}"
                        )
                    else:
                        error_msg = (
                            f"{exc.__class__.__name__} occurred. Waited "
                            f"for {timeout} s. No V2GTP message was "
                            "previously sent. This is probably a timeout "
                            f"while waiting for SupportedAppProtocolReq"
                        )
                else:
                    error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}"

                # TOFIX:
                # We need to overhaul and unify the manner exceptions are handled here
                self.session_listener.notify_exception(
                    exception_name=exc.__class__.__name__, exception_text=error_msg
                )

                self.stop_reason = StopNotification(False, error_msg, self.peer_name)

                await self.stop(reason=error_msg)
                self.session_handler_queue.put_nowait(self.stop_reason)
                return
            gc_enabled = gc.isenabled()
            try:
                if gc_enabled:
                    gc.disable()
                # This will create the values needed for the next state, such as
                # next_state, next_v2gtp_message, next_message_payload_type etc.
                await self.process_message(message)
                if self.current_state.next_v2gtp_msg:
                    # next_v2gtp_msg would not be set only if the next state is either
                    # Terminate or Pause on the EVCC side
                    await self.send(self.current_state.next_v2gtp_msg)
                    await self._update_state_info(self.current_state)
                    self.session_listener.notify_hlc_state_change(
                        str(self.current_state)
                    )

                if self.current_state.next_state in (Terminate, Pause):
                    await self.stop(reason=self.comm_session.stop_reason.reason)
                    self.comm_session.session_handler_queue.put_nowait(
                        self.comm_session.stop_reason
                    )
                    return

                timeout = self.current_state.next_msg_timeout
                self.go_to_next_state()
            except Exception as exc:  # pylint: disable=broad-exception-caught
                # TOFIX:
                # We need to overhaul and unify the manner exceptions are handled here
                self.session_listener.notify_exception(
                    exception_name=exc.__class__.__name__, exception_text=str(exc)
                )

                message_name = ""
                additional_info = ""
                if isinstance(exc, MessageProcessingError):
                    message_name = exc.message_name
                if isinstance(exc, FaultyStateImplementationError):
                    additional_info = f": {exc}"
                if isinstance(exc, EXIDecodingError):
                    additional_info = f": {exc}"
                if isinstance(exc, InvalidV2GTPMessageError):
                    additional_info = f": {exc}"

                stop_reason = (
                    f"{exc.__class__.__name__} occurred while processing message "
                    f"{message_name} in state {str(self.current_state)} : {exc}. "
                    f"{additional_info}"
                )

                self.stop_reason = StopNotification(
                    False,
                    stop_reason,
                    self.peer_name,
                )

                await self.stop(stop_reason)
                self.session_handler_queue.put_nowait(self.stop_reason)
                return
            finally:
                if gc_enabled:
                    gc.enable()'''
    #start
    async def rcv_loop(self, timeout: float):
        while self.is_looping:
            try:
                self.rcvd_msg_time = time.time_ns()
                message = await asyncio.wait_for(
                    self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout
                )
                self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9
                if message == b"" and self.reader.at_eof():
                    stop_reason = "TCP peer closed connection"
                    await self._stop_tcp_connection_using_stop_notification(stop_reason)
                    return
            except (asyncio.TimeoutError, ConnectionResetError) as exc:
                self.handle_stream_exception(exc, timeout)
                return
            gc_enabled = gc.isenabled()
            try:
                if gc_enabled:
                    gc.disable()
                # This will create the values needed for the next state, such as
                # next_state, next_v2gtp_message, next_message_payload_type etc.
                await self._process_received_message(message)
            except Exception as exc:  
                self._handle_processing_exception(exc)
                return
            finally:
                if gc_enabled:
                    gc.enable()

    def handle_stream_exception(self, exc: Exception, timeout: float):
        if isinstance(exc, asyncio.TimeoutError):
            if self.last_message_sent:
                error_msg = (
                    f"{exc.__class__.__name__} occurred. Waited for {timeout} s after sending last message: "
                    f"{str(self.last_message_sent)}"
                )
            else:
                error_msg = (
                    f"{exc.__class__.__name__} occurred. Waited for {timeout} s. No V2GTP message was previously "
                    "sent. This is probably a timeout while waiting for SupportedAppProtocolReq"
                )
        else:
            error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}"

        self.session_listener.notify_exception(
            exception_name=exc.__class__.__name__, exception_text=error_msg
        )

        self.stop_reason = StopNotification(False, error_msg, self.peer_name)
        self.session_handler_queue.put_nowait(self.stop_reason)

    async def _stop_tcp_connection_using_stop_notification(self, stop_reason: str):
        await self.stop(reason=stop_reason)
        self.session_handler_queue.put_nowait(
            StopNotification(False, stop_reason, self.peer_name)
        )

    async def _process_received_message(self, message: bytes):
        # This will create the values needed for the next state, such as
        # next_state, next_v2gtp_message, next_message_payload_type etc.
        await self.process_message(message)

        if self.current_state.next_v2gtp_msg:
            # next_v2gtp_msg would not be set only if the next state is either
            # Terminate or Pause on the EVCC side
            await self.send(self.current_state.next_v2gtp_msg)
        await self._update_state_info(self.current_state)
        self.session_listener.notify_hlc_state_change(str(self.current_state))

        if self.current_state.next_state in (Terminate, Pause):
            await self._stop_tcp_connection_using_stop_notification(self.comm_session.stop_reason.reason)
        else:
            self.go_to_next_state()
            # Use self.current_state.next_msg_timeout instead of hardcoded value
            # For simplicity, I am not changing timeout, it should be according to your use case
            self.timeout = self.current_state.next_msg_timeout

    def _handle_processing_exception(self, exc: Exception):
        message_name = ""
        additional_info = ""
        if isinstance(exc, MessageProcessingError):
            message_name = exc.message_name
        if isinstance(exc, FaultyStateImplementationError):
            additional_info = f": {exc}"
        if isinstance(exc, EXIDecodingError):
            additional_info = f": {exc}"
        if isinstance(exc, InvalidV2GTPMessageError):
            additional_info = f": {exc}"

        stop_reason = (
            f"{exc.__class__.__name__} occurred while processing message "
            f"{message_name} in state {str(self.current_state)} : {exc}. "
            f"{additional_info}"
        )

        self.stop_reason = StopNotification(False, stop_reason, self.peer_name)
        self.session_handler_queue.put_nowait(self.stop_reason)
    
    #end
    
    def _notify_listener_with_ev_data(self):

        self.session_listener.notify_target_current(
            self.evse_controller.ev_data_context.target_current
        )
        self.session_listener.notify_target_voltage(
            self.evse_controller.ev_data_context.target_voltage
        )
        self.session_listener.notify_complete_charging_status(
            bool(self.evse_controller.ev_data_context.present_soc == 100)
        )
        self.session_listener.notify_max_current(
            self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_current
        )
        self.session_listener.notify_max_power(
            self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_power
        )
        self.session_listener.notify_max_voltage(
            self.evse_controller.ev_data_context.session_limits.dc_limits.max_voltage
        )
        self.session_listener.notify_present_current(
            self.evse_controller.evse_data_context.present_current
        )
        self.session_listener.notify_present_voltage(
            self.evse_controller.ev_data_context.present_voltage
        )
        # soc: state of charge
        self.session_listener.notify_remaining_time_to_full_soc(
            self.evse_controller.ev_data_context.remaining_time_to_target_soc
        )

    async def _v2g_payload_extraction(self, message: bytes):
        # Step 1
        try:
            # First extract the V2GMessage payload from the V2GTPMessage ...
            v2gtp_msg = V2GTPMessage.from_bytes(self.comm_session.protocol, message)
        except InvalidV2GTPMessageError as exc:
            _LOGGER.exception("Incoming TCPPacket is not a valid V2GTPMessage")
            raise exc

        self._notify_listener_with_ev_data()
        return v2gtp_msg

    async def _get_decoded_message_and_protocol(self, v2gtp_msg: V2GTPMessage):
        # Step 2
        decoded_message: Union[
            SupportedAppProtocolReq,
            SupportedAppProtocolRes,
            V2GMessageV2,
            V2GMessageV20,
            V2GMessageDINSPEC,
            None,
        ] = None
        decoded_msg_protocol: Optional[Protocol] = None
        # Second decode the bytearray into the message
        try:
            decoded_message = EXI().from_exi(
                v2gtp_msg.payload, self.get_exi_ns(v2gtp_msg.payload_type)
            )
            decoded_msg_protocol = v2gtp_msg.protocol

            if hasattr(self.comm_session, "evse_id"):
                _LOGGER.trace(  # type: ignore[attr-defined]
                    f"{self.comm_session.evse_id}:::"
                    f"{v2gtp_msg.payload.hex()}:::"
                    f"{self.get_exi_ns(v2gtp_msg.payload_type).value}"
                )

        except V2GMessageValidationError as exc:
            _LOGGER.error(
                "EXI message (ns=%s) where validation failed: %s",
                self.get_exi_ns(v2gtp_msg.payload_type),
                v2gtp_msg.payload.hex(),
            )
            raise exc
        except EXIDecodingError as exc:
            _LOGGER.exception("%s", exc)
            _LOGGER.error(
                "EXI message (ns=%s) where error occured: %s",
                self.get_exi_ns(v2gtp_msg.payload_type),
                v2gtp_msg.payload.hex(),
            )
            raise exc

        # Shouldn't happen, but just to be sure (otherwise mypy would complain)
        if not decoded_message:
            _LOGGER.error(
                "Unusual error situation: decoded_message is None"
                "although no EXIDecodingError was raised"
            )

        self._notify_listener_with_ev_data()
        return decoded_message, decoded_msg_protocol

    async def _notify_hlc_rcvd_message_and_current_state_process_msg(
        self, decoded_message, decoded_msg_protocol: Protocol, v2gtp_msg: V2GTPMessage
    ):
        # Step 3
        # Third notify hlc with recieved message and process the message with the current state
        try:
            _LOGGER.info("%s received", str(decoded_message))
            process_message_to_notify(
                protocol=decoded_msg_protocol,
                message=decoded_message,
                time=self.rcvd_msg_time,
                timeout=-1.0,
                notify_callback=self.session_listener.notify_hlc_received_message,
            )
            self.current_state.injector_callback = (
                self.comm_injector.inject_data_to_response_message
            )
            await self.current_state.process_message(decoded_message, v2gtp_msg.payload)
        except MessageProcessingError as exc:
            _LOGGER.exception(
                "%s while processing %s", exc.__class__.__name__, exc.message_name
            )
            raise exc
        except FaultyStateImplementationError as exc:
            _LOGGER.exception("%s: %s", exc.__class__.__name__, exc)
            raise exc
        except ValidationError as exc:
            _LOGGER.exception("%s: %s", exc.__class__.__name__, exc.raw_errors)
            raise exc
        except AttributeError as exc:
            _LOGGER.exception("%s", exc)
            raise exc

        if (
            self.current_state.next_v2gtp_msg is None
            and self.current_state.next_state is not Terminate
        ):
            raise FaultyStateImplementationError(
                "Field 'next_v2gtp_msg' is "
                "None but must be set because "
                "next state is not Terminate"
            )
        self._notify_listener_with_ev_data()

        return
Leave a Comment