Untitled
unknown
plain_text
10 months ago
22 kB
18
Indexable
import asyncio
import gc
import logging
import time
from asyncio.streams import StreamReader, StreamWriter
from typing import Final, Optional, Tuple, Union
from pydantic import ValidationError
from iso15118.secc.comm_session_handler import SECCCommunicationSession
from iso15118.secc.controller.ev_data import EVSessionContext15118
from iso15118.secc.controller.interface import EVSEControllerInterface
from iso15118.secc.secc_settings import Config
from iso15118.shared.exceptions import (
EXIDecodingError,
FaultyStateImplementationError,
InvalidV2GTPMessageError,
MessageProcessingError,
V2GMessageValidationError,
)
from iso15118.shared.exi_codec import EXI
from iso15118.shared.messages.app_protocol import (
SupportedAppProtocolReq,
SupportedAppProtocolRes,
)
from iso15118.shared.messages.din_spec.msgdef import V2GMessage as V2GMessageDINSPEC
from iso15118.shared.messages.enums import Protocol, SessionStopAction
from iso15118.shared.messages.iso15118_2.msgdef import V2GMessage as V2GMessageV2
from iso15118.shared.messages.iso15118_20.common_types import (
V2GMessage as V2GMessageV20,
)
from iso15118.shared.messages.v2gtp import V2GTPMessage
from iso15118.shared.notifications import StopNotification
from iso15118.shared.states import Pause, Terminate
from core.communication_injector.abstract_communication_injector import (
AbstractCommunicationInjector,
)
from core.communication_listener.abstract_hlc_session_listener import (
AbstracHlcSessionListener,
)
from core.communication_listener.utils import process_message_to_notify
_LOGGER = logging.getLogger(__name__)
# The biggest message is the Certificate Installation Response,
# which is estimated to be maximum between 5k to 6k
# TODO check if that still holds with -20 (e.g. cross certs)
_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE: Final[int] = 7000
class HlcSession(SECCCommunicationSession):
def __init__(
self,
transport: Tuple[StreamReader, StreamWriter],
session_handler_queue: asyncio.Queue,
config: Config,
evse_controller: EVSEControllerInterface,
evse_id: str,
session_listener: AbstracHlcSessionListener,
comm_injector: AbstractCommunicationInjector,
ev_session_context: Optional[EVSessionContext15118],
):
SECCCommunicationSession.__init__(
self,
transport,
session_handler_queue,
config,
evse_controller,
evse_id,
ev_session_context,
)
self.comm_injector: AbstractCommunicationInjector = comm_injector
self.session_listener: AbstracHlcSessionListener = session_listener
self.is_looping: bool = True
self.rcvd_msg_time: Optional[float] = None
self.session_listener.notify_hlc_state_change(str(self.current_state))
async def process_message(self, message: bytes):
v2gtp_msg = await self._v2g_payload_extraction(message)
decoded_message, decoded_msg_protocol = (
await self._get_decoded_message_and_protocol(v2gtp_msg)
)
await self._notify_hlc_rcvd_message_and_current_state_process_msg(
decoded_message, decoded_msg_protocol, v2gtp_msg
)
async def stop(self, reason: str):
if self.current_state.next_state == Pause:
self.save_session_info()
terminate_or_pause = SessionStopAction.PAUSE
else:
terminate_or_pause = SessionStopAction.TERMINATE
_LOGGER.info(
"The data link will %s in 2 seconds and the TCP connection will close in 5 seconds.",
terminate_or_pause,
)
_LOGGER.info("Reason: %s", reason)
await asyncio.sleep(2)
# Signal data link layer to either terminate or pause the data
# link connection
if hasattr(self.comm_session, "evse_controller"):
evse_controller: EVSEControllerInterface = self.comm_session.evse_controller # type: ignore
await evse_controller.update_data_link(terminate_or_pause)
await evse_controller.session_ended(str(self.current_state), reason)
elif hasattr(self.comm_session, "ev_controller"):
# type: ignore[empty-body]
await self.comm_session.ev_controller.enable_charging(False) # type: ignore
_LOGGER.info("%sd the data link", terminate_or_pause)
await asyncio.sleep(3)
try:
self.writer.close()
await self.writer.wait_closed()
except (asyncio.TimeoutError, ConnectionResetError) as exc:
_LOGGER.info(str(exc))
# TOFIX:
# We need to overhaul and unify the manner exceptions are handled here
self.session_listener.notify_exception(
exception_name=exc.__class__.__name__, exception_text=str(exc)
)
_LOGGER.info("TCP connection closed to peer with address %s", self.peer_name)
self.session_listener.notify_hlc_session_end()
async def send(self, message: V2GTPMessage):
prev_time = time.time_ns()
injector_config = self.comm_injector.get_injected_response_message_config(
str(self.current_state)
)
if injector_config and injector_config.response_delay:
delay = injector_config.response_delay
await asyncio.sleep(delay)
self.writer.write(message.to_bytes())
await self.writer.drain()
sent_msg_time = (time.time_ns() - prev_time) * 1e-9
_LOGGER.info("Sent %s", str(self.current_state.message))
process_message_to_notify(
protocol=message.protocol,
message=self.current_state.message,
time=sent_msg_time,
timeout=-1.0,
notify_callback=self.session_listener.notify_hlc_sent_message,
)
'''async def rcv_loop(self, timeout: float):
while self.is_looping:
try:
self.rcvd_msg_time = time.time_ns()
message = await asyncio.wait_for(
self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout
)
self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9
if message == b"" and self.reader.at_eof():
stop_reason: str = "TCP peer closed connection"
await self.stop(reason=stop_reason)
self.session_handler_queue.put_nowait(
StopNotification(
False,
stop_reason,
self.peer_name,
)
)
return
except (asyncio.TimeoutError, ConnectionResetError) as exc:
if isinstance(exc, asyncio.TimeoutError):
if self.last_message_sent:
error_msg = (
f"{exc.__class__.__name__} occurred. Waited "
f"for {timeout} s after sending last message: "
f"{str(self.last_message_sent)}"
)
else:
error_msg = (
f"{exc.__class__.__name__} occurred. Waited "
f"for {timeout} s. No V2GTP message was "
"previously sent. This is probably a timeout "
f"while waiting for SupportedAppProtocolReq"
)
else:
error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}"
# TOFIX:
# We need to overhaul and unify the manner exceptions are handled here
self.session_listener.notify_exception(
exception_name=exc.__class__.__name__, exception_text=error_msg
)
self.stop_reason = StopNotification(False, error_msg, self.peer_name)
await self.stop(reason=error_msg)
self.session_handler_queue.put_nowait(self.stop_reason)
return
gc_enabled = gc.isenabled()
try:
if gc_enabled:
gc.disable()
# This will create the values needed for the next state, such as
# next_state, next_v2gtp_message, next_message_payload_type etc.
await self.process_message(message)
if self.current_state.next_v2gtp_msg:
# next_v2gtp_msg would not be set only if the next state is either
# Terminate or Pause on the EVCC side
await self.send(self.current_state.next_v2gtp_msg)
await self._update_state_info(self.current_state)
self.session_listener.notify_hlc_state_change(
str(self.current_state)
)
if self.current_state.next_state in (Terminate, Pause):
await self.stop(reason=self.comm_session.stop_reason.reason)
self.comm_session.session_handler_queue.put_nowait(
self.comm_session.stop_reason
)
return
timeout = self.current_state.next_msg_timeout
self.go_to_next_state()
except Exception as exc: # pylint: disable=broad-exception-caught
# TOFIX:
# We need to overhaul and unify the manner exceptions are handled here
self.session_listener.notify_exception(
exception_name=exc.__class__.__name__, exception_text=str(exc)
)
message_name = ""
additional_info = ""
if isinstance(exc, MessageProcessingError):
message_name = exc.message_name
if isinstance(exc, FaultyStateImplementationError):
additional_info = f": {exc}"
if isinstance(exc, EXIDecodingError):
additional_info = f": {exc}"
if isinstance(exc, InvalidV2GTPMessageError):
additional_info = f": {exc}"
stop_reason = (
f"{exc.__class__.__name__} occurred while processing message "
f"{message_name} in state {str(self.current_state)} : {exc}. "
f"{additional_info}"
)
self.stop_reason = StopNotification(
False,
stop_reason,
self.peer_name,
)
await self.stop(stop_reason)
self.session_handler_queue.put_nowait(self.stop_reason)
return
finally:
if gc_enabled:
gc.enable()'''
#start
async def rcv_loop(self, timeout: float):
while self.is_looping:
try:
self.rcvd_msg_time = time.time_ns()
message = await asyncio.wait_for(
self.reader.read(_BIGGEST_REQ_MESSAGE_PAYLOAD_SIZE), timeout
)
self.rcvd_msg_time = (time.time_ns() - self.rcvd_msg_time) * 1e-9
if message == b"" and self.reader.at_eof():
stop_reason = "TCP peer closed connection"
await self._stop_tcp_connection_using_stop_notification(stop_reason)
return
except (asyncio.TimeoutError, ConnectionResetError) as exc:
self.handle_stream_exception(exc, timeout)
return
gc_enabled = gc.isenabled()
try:
if gc_enabled:
gc.disable()
# This will create the values needed for the next state, such as
# next_state, next_v2gtp_message, next_message_payload_type etc.
await self._process_received_message(message)
except Exception as exc:
self._handle_processing_exception(exc)
return
finally:
if gc_enabled:
gc.enable()
def handle_stream_exception(self, exc: Exception, timeout: float):
if isinstance(exc, asyncio.TimeoutError):
if self.last_message_sent:
error_msg = (
f"{exc.__class__.__name__} occurred. Waited for {timeout} s after sending last message: "
f"{str(self.last_message_sent)}"
)
else:
error_msg = (
f"{exc.__class__.__name__} occurred. Waited for {timeout} s. No V2GTP message was previously "
"sent. This is probably a timeout while waiting for SupportedAppProtocolReq"
)
else:
error_msg = f"{exc.__class__.__name__} occurred. {str(exc)}"
self.session_listener.notify_exception(
exception_name=exc.__class__.__name__, exception_text=error_msg
)
self.stop_reason = StopNotification(False, error_msg, self.peer_name)
self.session_handler_queue.put_nowait(self.stop_reason)
async def _stop_tcp_connection_using_stop_notification(self, stop_reason: str):
await self.stop(reason=stop_reason)
self.session_handler_queue.put_nowait(
StopNotification(False, stop_reason, self.peer_name)
)
async def _process_received_message(self, message: bytes):
# This will create the values needed for the next state, such as
# next_state, next_v2gtp_message, next_message_payload_type etc.
await self.process_message(message)
if self.current_state.next_v2gtp_msg:
# next_v2gtp_msg would not be set only if the next state is either
# Terminate or Pause on the EVCC side
await self.send(self.current_state.next_v2gtp_msg)
await self._update_state_info(self.current_state)
self.session_listener.notify_hlc_state_change(str(self.current_state))
if self.current_state.next_state in (Terminate, Pause):
await self._stop_tcp_connection_using_stop_notification(self.comm_session.stop_reason.reason)
else:
self.go_to_next_state()
# Use self.current_state.next_msg_timeout instead of hardcoded value
# For simplicity, I am not changing timeout, it should be according to your use case
self.timeout = self.current_state.next_msg_timeout
def _handle_processing_exception(self, exc: Exception):
message_name = ""
additional_info = ""
if isinstance(exc, MessageProcessingError):
message_name = exc.message_name
if isinstance(exc, FaultyStateImplementationError):
additional_info = f": {exc}"
if isinstance(exc, EXIDecodingError):
additional_info = f": {exc}"
if isinstance(exc, InvalidV2GTPMessageError):
additional_info = f": {exc}"
stop_reason = (
f"{exc.__class__.__name__} occurred while processing message "
f"{message_name} in state {str(self.current_state)} : {exc}. "
f"{additional_info}"
)
self.stop_reason = StopNotification(False, stop_reason, self.peer_name)
self.session_handler_queue.put_nowait(self.stop_reason)
#end
def _notify_listener_with_ev_data(self):
self.session_listener.notify_target_current(
self.evse_controller.ev_data_context.target_current
)
self.session_listener.notify_target_voltage(
self.evse_controller.ev_data_context.target_voltage
)
self.session_listener.notify_complete_charging_status(
bool(self.evse_controller.ev_data_context.present_soc == 100)
)
self.session_listener.notify_max_current(
self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_current
)
self.session_listener.notify_max_power(
self.evse_controller.ev_data_context.session_limits.dc_limits.max_charge_power
)
self.session_listener.notify_max_voltage(
self.evse_controller.ev_data_context.session_limits.dc_limits.max_voltage
)
self.session_listener.notify_present_current(
self.evse_controller.evse_data_context.present_current
)
self.session_listener.notify_present_voltage(
self.evse_controller.ev_data_context.present_voltage
)
# soc: state of charge
self.session_listener.notify_remaining_time_to_full_soc(
self.evse_controller.ev_data_context.remaining_time_to_target_soc
)
async def _v2g_payload_extraction(self, message: bytes):
# Step 1
try:
# First extract the V2GMessage payload from the V2GTPMessage ...
v2gtp_msg = V2GTPMessage.from_bytes(self.comm_session.protocol, message)
except InvalidV2GTPMessageError as exc:
_LOGGER.exception("Incoming TCPPacket is not a valid V2GTPMessage")
raise exc
self._notify_listener_with_ev_data()
return v2gtp_msg
async def _get_decoded_message_and_protocol(self, v2gtp_msg: V2GTPMessage):
# Step 2
decoded_message: Union[
SupportedAppProtocolReq,
SupportedAppProtocolRes,
V2GMessageV2,
V2GMessageV20,
V2GMessageDINSPEC,
None,
] = None
decoded_msg_protocol: Optional[Protocol] = None
# Second decode the bytearray into the message
try:
decoded_message = EXI().from_exi(
v2gtp_msg.payload, self.get_exi_ns(v2gtp_msg.payload_type)
)
decoded_msg_protocol = v2gtp_msg.protocol
if hasattr(self.comm_session, "evse_id"):
_LOGGER.trace( # type: ignore[attr-defined]
f"{self.comm_session.evse_id}:::"
f"{v2gtp_msg.payload.hex()}:::"
f"{self.get_exi_ns(v2gtp_msg.payload_type).value}"
)
except V2GMessageValidationError as exc:
_LOGGER.error(
"EXI message (ns=%s) where validation failed: %s",
self.get_exi_ns(v2gtp_msg.payload_type),
v2gtp_msg.payload.hex(),
)
raise exc
except EXIDecodingError as exc:
_LOGGER.exception("%s", exc)
_LOGGER.error(
"EXI message (ns=%s) where error occured: %s",
self.get_exi_ns(v2gtp_msg.payload_type),
v2gtp_msg.payload.hex(),
)
raise exc
# Shouldn't happen, but just to be sure (otherwise mypy would complain)
if not decoded_message:
_LOGGER.error(
"Unusual error situation: decoded_message is None"
"although no EXIDecodingError was raised"
)
self._notify_listener_with_ev_data()
return decoded_message, decoded_msg_protocol
async def _notify_hlc_rcvd_message_and_current_state_process_msg(
self, decoded_message, decoded_msg_protocol: Protocol, v2gtp_msg: V2GTPMessage
):
# Step 3
# Third notify hlc with recieved message and process the message with the current state
try:
_LOGGER.info("%s received", str(decoded_message))
process_message_to_notify(
protocol=decoded_msg_protocol,
message=decoded_message,
time=self.rcvd_msg_time,
timeout=-1.0,
notify_callback=self.session_listener.notify_hlc_received_message,
)
self.current_state.injector_callback = (
self.comm_injector.inject_data_to_response_message
)
await self.current_state.process_message(decoded_message, v2gtp_msg.payload)
except MessageProcessingError as exc:
_LOGGER.exception(
"%s while processing %s", exc.__class__.__name__, exc.message_name
)
raise exc
except FaultyStateImplementationError as exc:
_LOGGER.exception("%s: %s", exc.__class__.__name__, exc)
raise exc
except ValidationError as exc:
_LOGGER.exception("%s: %s", exc.__class__.__name__, exc.raw_errors)
raise exc
except AttributeError as exc:
_LOGGER.exception("%s", exc)
raise exc
if (
self.current_state.next_v2gtp_msg is None
and self.current_state.next_state is not Terminate
):
raise FaultyStateImplementationError(
"Field 'next_v2gtp_msg' is "
"None but must be set because "
"next state is not Terminate"
)
self._notify_listener_with_ev_data()
return
Editor is loading...
Leave a Comment