Untitled

 avatar
unknown
plain_text
10 months ago
22 kB
18
Indexable
import asyncio
import gc
import logging
import time
from asyncio.streams import StreamReader, StreamWriter
from typing import Final, Optional, Tuple, Union
from pydantic import ValidationError

from iso15118.secc.comm_session_handler import SECCCommunicationSession
from iso15118.secc.controller.ev_data import EVSessionContext15118
from iso15118.secc.controller.interface import EVSEControllerInterface
from iso15118.secc.secc_settings import Config
from iso15118.shared.exceptions import (
    EXIDecodingError,
    FaultyStateImplementationError,
    InvalidV2GTPMessageError,
    MessageProcessingError,
    V2GMessageValidationError,
)
from iso15118.shared.exi_codec import EXI
from iso15118.shared.messages.app_protocol import (
    SupportedAppProtocolReq,
    SupportedAppProtocolRes,
)
from iso15118.shared.messages.din_spec.msgdef import V2GMessage as V2GMessageDINSPEC
from iso15118.shared.messages.enums import Protocol, SessionStopAction
from iso15118.shared.messages.iso15118_2.msgdef import V2GMessage as V2GMessageV2
from iso15118.shared.messages.iso15118_20.common_types import (
    V2GMessage as V2GMessageV20,
)
from iso15118.shared.messages.v2gtp import V2GTPMessage
from iso15118.shared.notifications import StopNotification
from iso15118.shared.states import Pause, Terminate

from core.communication_injector.abstract_communication_injector import (
    AbstractCommunicationInjector,
)
from core.communication_listener.abstract_hlc_session_listener import (
    AbstracHlcSessionListener,
)
from core.communication_listener.utils import process_message_to_notify

_LOGGER = logging.getLogger(__name__)
# The biggest message is the Certificate Installation Response,
# which is estimated to be maximum between 5k to 6k
# TODO check if that still holds with -20 (e.g. cross certs)
_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE: Final[int] = 7000


class HlcSession(SECCCommunicationSession):
    def __init__(
        self,
        transport: Tuple[StreamReader, StreamWriter],
        session_handler_queue: asyncio.Queue,
        config: Config,
        evse_controller: EVSEControllerInterface,
        evse_id: str,
        session_listener: AbstracHlcSessionListener,
        comm_injector: AbstractCommunicationInjector,
        ev_session_context: Optional[EVSessionContext15118],
    ):
        SECCCommunicationSession.__init__(
            self,
            transport,
            session_handler_queue,
            config,
            evse_controller,
            evse_id,
            ev_session_context,
        )
        self.comm_injector: AbstractCommunicationInjector = comm_injector
        self.session_listener: AbstracHlcSessionListener = session_listener
        self.is_looping: bool = True
        self.rcvd_msg_time: Optional[float] = None

        self.session_listener.notify_hlc_state_change(str(self.current_state))

    async def process_message(self, message: bytes):

        v2gtp_msg = await self._v2g_payload_extraction(message)

        decoded_message, decoded_msg_protocol = (
            await self._get_decoded_message_and_protocol(v2gtp_msg)
        )

        await self._notify_hlc_rcvd_message_and_current_state_process_msg(
            decoded_message, decoded_msg_protocol, v2gtp_msg
        )

    async def stop(self, reason: str):
        if self.current_state.next_state == Pause:
            self.save_session_info()
            terminate_or_pause = SessionStopAction.PAUSE
        else:
            terminate_or_pause = SessionStopAction.TERMINATE

        _LOGGER.info(
            "The data link will %s in 2 seconds and the TCP connection will close in 5 seconds.",
            terminate_or_pause,
        )
        _LOGGER.info("Reason: %s", reason)

        await asyncio.sleep(2)
        # Signal data link layer to either terminate or pause the data
        # link connection
        if hasattr(self.comm_session, "evse_controller"):
            evse_controller: EVSEControllerInterface = self.comm_session.evse_controller  # type: ignore
            await evse_controller.update_data_link(terminate_or_pause)
            await evse_controller.session_ended(str(self.current_state), reason)
        elif hasattr(self.comm_session, "ev_controller"):
            # type: ignore[empty-body]
            await self.comm_session.ev_controller.enable_charging(False)  # type: ignore
        _LOGGER.info("%sd the data link", terminate_or_pause)
        await asyncio.sleep(3)
        try:
            self.writer.close()
            await self.writer.wait_closed()
        except (asyncio.TimeoutError, ConnectionResetError) as exc:
            _LOGGER.info(str(exc))

            # TOFIX:
            # We need to overhaul and unify the manner exceptions are handled here
            self.session_listener.notify_exception(
                exception_name=exc.__class__.__name__, exception_text=str(exc)
            )
        _LOGGER.info("TCP connection closed to peer with address %s", self.peer_name)

        self.session_listener.notify_hlc_session_end()

    async def send(self, message: V2GTPMessage):
        prev_time = time.time_ns()
        injector_config = self.comm_injector.get_injected_response_message_config(
            str(self.current_state)
        )
        if injector_config and injector_config.response_delay:
            delay = injector_config.response_delay
            await asyncio.sleep(delay)
        self.writer.write(message.to_bytes())
        await self.writer.drain()
        sent_msg_time = (time.time_ns() - prev_time) * 1e-9
        _LOGGER.info("Sent %s", str(self.current_state.message))
        process_message_to_notify(
            protocol=message.protocol,
            message=self.current_state.message,
            time=sent_msg_time,
            timeout=-1.0,
            notify_callback=self.session_listener.notify_hlc_sent_message,
        )

    '''async def rcv_loop(self, timeout: float):
        while self.is_looping:
            try:
                self.rcvd_msg_time = time.time_ns()
                message = await asyncio.wait_for(
                    self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout
                )
                self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9
                if message == b"" and self.reader.at_eof():
                    stop_reason: str = "TCP peer closed connection"
                    await self.stop(reason=stop_reason)
                    self.session_handler_queue.put_nowait(
                        StopNotification(
                            False,
                            stop_reason,
                            self.peer_name,
                        )
                    )
                    return
            except (asyncio.TimeoutError, ConnectionResetError) as exc:
                if isinstance(exc, asyncio.TimeoutError):
                    if self.last_message_sent:
                        error_msg = (
                            f"{exc.__class__.__name__} occurred. Waited "
                            f"for {timeout} s after sending last message: "
                            f"{str(self.last_message_sent)}"
                        )
                    else:
                        error_msg = (
                            f"{exc.__class__.__name__} occurred. Waited "
                            f"for {timeout} s. No V2GTP message was "
                            "previously sent. This is probably a timeout "
                            f"while waiting for SupportedAppProtocolReq"
                        )
                else:
                    error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}"

                # TOFIX:
                # We need to overhaul and unify the manner exceptions are handled here
                self.session_listener.notify_exception(
                    exception_name=exc.__class__.__name__, exception_text=error_msg
                )

                self.stop_reason = StopNotification(False, error_msg, self.peer_name)

                await self.stop(reason=error_msg)
                self.session_handler_queue.put_nowait(self.stop_reason)
                return
            gc_enabled = gc.isenabled()
            try:
                if gc_enabled:
                    gc.disable()
                # This will create the values needed for the next state, such as
                # next_state, next_v2gtp_message, next_message_payload_type etc.
                await self.process_message(message)
                if self.current_state.next_v2gtp_msg:
                    # next_v2gtp_msg would not be set only if the next state is either
                    # Terminate or Pause on the EVCC side
                    await self.send(self.current_state.next_v2gtp_msg)
                    await self._update_state_info(self.current_state)
                    self.session_listener.notify_hlc_state_change(
                        str(self.current_state)
                    )

                if self.current_state.next_state in (Terminate, Pause):
                    await self.stop(reason=self.comm_session.stop_reason.reason)
                    self.comm_session.session_handler_queue.put_nowait(
                        self.comm_session.stop_reason
                    )
                    return

                timeout = self.current_state.next_msg_timeout
                self.go_to_next_state()
            except Exception as exc:  # pylint: disable=broad-exception-caught
                # TOFIX:
                # We need to overhaul and unify the manner exceptions are handled here
                self.session_listener.notify_exception(
                    exception_name=exc.__class__.__name__, exception_text=str(exc)
                )

                message_name = ""
                additional_info = ""
                if isinstance(exc, MessageProcessingError):
                    message_name = exc.message_name
                if isinstance(exc, FaultyStateImplementationError):
                    additional_info = f": {exc}"
                if isinstance(exc, EXIDecodingError):
                    additional_info = f": {exc}"
                if isinstance(exc, InvalidV2GTPMessageError):
                    additional_info = f": {exc}"

                stop_reason = (
                    f"{exc.__class__.__name__} occurred while processing message "
                    f"{message_name} in state {str(self.current_state)} : {exc}. "
                    f"{additional_info}"
                )

                self.stop_reason = StopNotification(
                    False,
                    stop_reason,
                    self.peer_name,
                )

                await self.stop(stop_reason)
                self.session_handler_queue.put_nowait(self.stop_reason)
                return
            finally:
                if gc_enabled:
                    gc.enable()'''
    #start
    async def rcv_loop(self, timeout: float):
        while self.is_looping:
            try:
                self.rcvd_msg_time = time.time_ns()
                message = await asyncio.wait_for(
                    self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout
                )
                self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9
                if message == b"" and self.reader.at_eof():
                    stop_reason = "TCP peer closed connection"
                    await self._stop_tcp_connection_using_stop_notification(stop_reason)
                    return
            except (asyncio.TimeoutError, ConnectionResetError) as exc:
                self.handle_stream_exception(exc, timeout)
                return
            gc_enabled = gc.isenabled()
            try:
                if gc_enabled:
                    gc.disable()
                # This will create the values needed for the next state, such as
                # next_state, next_v2gtp_message, next_message_payload_type etc.
                await self._process_received_message(message)
            except Exception as exc:  
                self._handle_processing_exception(exc)
                return
            finally:
                if gc_enabled:
                    gc.enable()

    def handle_stream_exception(self, exc: Exception, timeout: float):
        if isinstance(exc, asyncio.TimeoutError):
            if self.last_message_sent:
                error_msg = (
                    f"{exc.__class__.__name__} occurred. Waited for {timeout} s after sending last message: "
                    f"{str(self.last_message_sent)}"
                )
            else:
                error_msg = (
                    f"{exc.__class__.__name__} occurred. Waited for {timeout} s. No V2GTP message was previously "
                    "sent. This is probably a timeout while waiting for SupportedAppProtocolReq"
                )
        else:
            error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}"

        self.session_listener.notify_exception(
            exception_name=exc.__class__.__name__, exception_text=error_msg
        )

        self.stop_reason = StopNotification(False, error_msg, self.peer_name)
        self.session_handler_queue.put_nowait(self.stop_reason)

    async def _stop_tcp_connection_using_stop_notification(self, stop_reason: str):
        await self.stop(reason=stop_reason)
        self.session_handler_queue.put_nowait(
            StopNotification(False, stop_reason, self.peer_name)
        )

    async def _process_received_message(self, message: bytes):
        # This will create the values needed for the next state, such as
        # next_state, next_v2gtp_message, next_message_payload_type etc.
        await self.process_message(message)

        if self.current_state.next_v2gtp_msg:
            # next_v2gtp_msg would not be set only if the next state is either
            # Terminate or Pause on the EVCC side
            await self.send(self.current_state.next_v2gtp_msg)
        await self._update_state_info(self.current_state)
        self.session_listener.notify_hlc_state_change(str(self.current_state))

        if self.current_state.next_state in (Terminate, Pause):
            await self._stop_tcp_connection_using_stop_notification(self.comm_session.stop_reason.reason)
        else:
            self.go_to_next_state()
            # Use self.current_state.next_msg_timeout instead of hardcoded value
            # For simplicity, I am not changing timeout, it should be according to your use case
            self.timeout = self.current_state.next_msg_timeout

    def _handle_processing_exception(self, exc: Exception):
        message_name = ""
        additional_info = ""
        if isinstance(exc, MessageProcessingError):
            message_name = exc.message_name
        if isinstance(exc, FaultyStateImplementationError):
            additional_info = f": {exc}"
        if isinstance(exc, EXIDecodingError):
            additional_info = f": {exc}"
        if isinstance(exc, InvalidV2GTPMessageError):
            additional_info = f": {exc}"

        stop_reason = (
            f"{exc.__class__.__name__} occurred while processing message "
            f"{message_name} in state {str(self.current_state)} : {exc}. "
            f"{additional_info}"
        )

        self.stop_reason = StopNotification(False, stop_reason, self.peer_name)
        self.session_handler_queue.put_nowait(self.stop_reason)
    
    #end
    
    def _notify_listener_with_ev_data(self):

        self.session_listener.notify_target_current(
            self.evse_controller.ev_data_context.target_current
        )
        self.session_listener.notify_target_voltage(
            self.evse_controller.ev_data_context.target_voltage
        )
        self.session_listener.notify_complete_charging_status(
            bool(self.evse_controller.ev_data_context.present_soc == 100)
        )
        self.session_listener.notify_max_current(
            self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_current
        )
        self.session_listener.notify_max_power(
            self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_power
        )
        self.session_listener.notify_max_voltage(
            self.evse_controller.ev_data_context.session_limits.dc_limits.max_voltage
        )
        self.session_listener.notify_present_current(
            self.evse_controller.evse_data_context.present_current
        )
        self.session_listener.notify_present_voltage(
            self.evse_controller.ev_data_context.present_voltage
        )
        # soc: state of charge
        self.session_listener.notify_remaining_time_to_full_soc(
            self.evse_controller.ev_data_context.remaining_time_to_target_soc
        )

    async def _v2g_payload_extraction(self, message: bytes):
        # Step 1
        try:
            # First extract the V2GMessage payload from the V2GTPMessage ...
            v2gtp_msg = V2GTPMessage.from_bytes(self.comm_session.protocol, message)
        except InvalidV2GTPMessageError as exc:
            _LOGGER.exception("Incoming TCPPacket is not a valid V2GTPMessage")
            raise exc

        self._notify_listener_with_ev_data()
        return v2gtp_msg

    async def _get_decoded_message_and_protocol(self, v2gtp_msg: V2GTPMessage):
        # Step 2
        decoded_message: Union[
            SupportedAppProtocolReq,
            SupportedAppProtocolRes,
            V2GMessageV2,
            V2GMessageV20,
            V2GMessageDINSPEC,
            None,
        ] = None
        decoded_msg_protocol: Optional[Protocol] = None
        # Second decode the bytearray into the message
        try:
            decoded_message = EXI().from_exi(
                v2gtp_msg.payload, self.get_exi_ns(v2gtp_msg.payload_type)
            )
            decoded_msg_protocol = v2gtp_msg.protocol

            if hasattr(self.comm_session, "evse_id"):
                _LOGGER.trace(  # type: ignore[attr-defined]
                    f"{self.comm_session.evse_id}:::"
                    f"{v2gtp_msg.payload.hex()}:::"
                    f"{self.get_exi_ns(v2gtp_msg.payload_type).value}"
                )

        except V2GMessageValidationError as exc:
            _LOGGER.error(
                "EXI message (ns=%s) where validation failed: %s",
                self.get_exi_ns(v2gtp_msg.payload_type),
                v2gtp_msg.payload.hex(),
            )
            raise exc
        except EXIDecodingError as exc:
            _LOGGER.exception("%s", exc)
            _LOGGER.error(
                "EXI message (ns=%s) where error occured: %s",
                self.get_exi_ns(v2gtp_msg.payload_type),
                v2gtp_msg.payload.hex(),
            )
            raise exc

        # Shouldn't happen, but just to be sure (otherwise mypy would complain)
        if not decoded_message:
            _LOGGER.error(
                "Unusual error situation: decoded_message is None"
                "although no EXIDecodingError was raised"
            )

        self._notify_listener_with_ev_data()
        return decoded_message, decoded_msg_protocol

    async def _notify_hlc_rcvd_message_and_current_state_process_msg(
        self, decoded_message, decoded_msg_protocol: Protocol, v2gtp_msg: V2GTPMessage
    ):
        # Step 3
        # Third notify hlc with recieved message and process the message with the current state
        try:
            _LOGGER.info("%s received", str(decoded_message))
            process_message_to_notify(
                protocol=decoded_msg_protocol,
                message=decoded_message,
                time=self.rcvd_msg_time,
                timeout=-1.0,
                notify_callback=self.session_listener.notify_hlc_received_message,
            )
            self.current_state.injector_callback = (
                self.comm_injector.inject_data_to_response_message
            )
            await self.current_state.process_message(decoded_message, v2gtp_msg.payload)
        except MessageProcessingError as exc:
            _LOGGER.exception(
                "%s while processing %s", exc.__class__.__name__, exc.message_name
            )
            raise exc
        except FaultyStateImplementationError as exc:
            _LOGGER.exception("%s: %s", exc.__class__.__name__, exc)
            raise exc
        except ValidationError as exc:
            _LOGGER.exception("%s: %s", exc.__class__.__name__, exc.raw_errors)
            raise exc
        except AttributeError as exc:
            _LOGGER.exception("%s", exc)
            raise exc

        if (
            self.current_state.next_v2gtp_msg is None
            and self.current_state.next_state is not Terminate
        ):
            raise FaultyStateImplementationError(
                "Field 'next_v2gtp_msg' is "
                "None but must be set because "
                "next state is not Terminate"
            )
        self._notify_listener_with_ev_data()

        return
Editor is loading...
Leave a Comment