Untitled

 avatar
unknown
plain_text
2 months ago
8.0 kB
3
Indexable
import struct
from dataclasses import dataclass
from enum import Enum, IntFlag
from typing import Any, Callable, Dict

from construct import BitStruct, BitsInteger, Struct, Int16ul, Bytes, IfThenElse, this

# -----------------------------------------------------------------------------
# Enumy i flagi
# -----------------------------------------------------------------------------
class MessageType(Enum):
    FT_ACK = 0    # Potwierdzenie
    FT_NACK = 1   # Negatywne potwierdzenie
    # …dodaj inne typy

class PayloadType(Enum):
    SENSOR_STATUS = 0    # Pojedynczy rekord – np. stan czujnika
    SENSOR_SERIES = 1    # Tablica rekordów
    MIXED_RECORDS = 2    # Lista rekordów różnych typów
    # …dodaj inne typy

class ExampleFlags(IntFlag):
    FLAG_A = 1
    FLAG_B = 2
    FLAG_C = 4
    FLAG_D = 8

# -----------------------------------------------------------------------------
# Struktury danych
# -----------------------------------------------------------------------------
@dataclass
class Header:
    protocol_version: int    # PV (3 bity)
    not_used: int            # NU (5 bitów – powinny być zerami)
    message_type: int        # MT (3 bity)
    payload_type: int        # PT (5 bitów)
    sender_full: int         # 16-bitowy identyfikator (SID+RID)
    seq_num: int             # Numer sekwencyjny
    payload_len: int         # Długość payload / Extended Info

    @property
    def sid(self) -> int:
        return self.sender_full & 0xFF

    @property
    def rid(self) -> int:
        return (self.sender_full >> 8) & 0xFF

@dataclass
class Datagram:
    header: Header
    payload: Any
    verif_code: bytes

# -----------------------------------------------------------------------------
# Rejestracja dekoderów payload
# -----------------------------------------------------------------------------
PAYLOAD_DECODERS: Dict[int, Callable[[bytes], Any]] = {}

def register_payload_decoder(payload_type: int, decoder_func: Callable[[bytes], Any]) -> None:
    PAYLOAD_DECODERS[payload_type] = decoder_func

def decode_payload(payload_bytes: bytes, payload_type: int) -> Any:
    if payload_type in PAYLOAD_DECODERS:
        return PAYLOAD_DECODERS[payload_type](payload_bytes)
    return payload_bytes

# -----------------------------------------------------------------------------
# Definicje struktur Construct
# -----------------------------------------------------------------------------
# Pole PROTOCOL – 1 bajt: 3 bity PV, 5 bitów NU
protocol_struct = BitStruct(
    "pv" / BitsInteger(3),
    "nu" / BitsInteger(5)
)

# UWAGA: W naszym protokole najniższe 3 bity (LSB) to MT, a pozostałe 5 bitów to PT.
# Construct domyślnie operuje w kolejności MSB-first, dlatego zamieniamy kolejność:
message_desc_struct = BitStruct(
    "pt" / BitsInteger(5),
    "mt" / BitsInteger(3)
)

# Nagłówek – 8 bajtów
header_struct = Struct(
    "protocol" / protocol_struct,
    "message_desc" / message_desc_struct,
    "sender" / Int16ul,
    "seq_num" / Int16ul,
    "payload_len" / Int16ul
)

# Cały datagram – nagłówek, payload o dynamicznej długości, verification code zależny od długości payload
datagram_struct = Struct(
    "header" / header_struct,
    "payload" / Bytes(this.header.payload_len),
    "verif_code" / IfThenElse(this.header.payload_len <= 16, Bytes(2), Bytes(4))
)

# -----------------------------------------------------------------------------
# Funkcje dekodujące
# -----------------------------------------------------------------------------
def parse_datagram(data: bytes) -> Datagram:
    parsed = datagram_struct.parse(data)
    hdr = parsed.header
    header_obj = Header(
        protocol_version=hdr.protocol.pv,
        not_used=hdr.protocol.nu,
        message_type=hdr.message_desc.mt,
        payload_type=hdr.message_desc.pt,
        sender_full=hdr.sender,
        seq_num=hdr.seq_num,
        payload_len=hdr.payload_len
    )
    decoded_payload = decode_payload(parsed.payload, hdr.message_desc.pt)
    return Datagram(header=header_obj, payload=decoded_payload, verif_code=parsed.verif_code)

def decode_datagram(hex_str: str) -> Datagram:
    try:
        data = bytes.fromhex(hex_str)
    except ValueError as e:
        raise ValueError("Nieprawidłowy ciąg szesnastkowy.") from e
    return parse_datagram(data)

# -----------------------------------------------------------------------------
# Przykładowy dekoder rekordu SENSOR_STATUS dla PayloadType SENSOR_STATUS (wartość 0)
# -----------------------------------------------------------------------------
def decode_sensor_status(payload_bytes: bytes) -> dict:
    if len(payload_bytes) != 4:
        raise ValueError("Nieprawidłowa długość rekordu SENSOR_STATUS, oczekiwano 4 bajtów.")
    temperature, humidity = struct.unpack("<hh", payload_bytes)
    return {"temperature": temperature, "humidity": humidity}

register_payload_decoder(PayloadType.SENSOR_STATUS.value, decode_sensor_status)

# -----------------------------------------------------------------------------
# Testy jednostkowe (dla celów developerskich)
# -----------------------------------------------------------------------------
if __name__ == "__main__":
    import unittest

    class TestDatagramDecoder(unittest.TestCase):
        def test_sensor_status_decoding(self):
            # Przygotowujemy datagram z SENSOR_STATUS
            protocol_byte = 0x03  # PV=3, NU=0
            mt = 1
            pt = 0  # SENSOR_STATUS
            # MESSAGE DESC: budujemy jako (pt << 3) | mt
            msg_desc_byte = ((pt & 0x1F) << 3) | (mt & 0x07)
            sender = struct.pack("<H", 0x1234)
            seq_num = struct.pack("<H", 1)
            payload_len = struct.pack("<H", 4)
            header_bytes = bytes([protocol_byte, msg_desc_byte]) + sender + seq_num + payload_len

            payload = struct.pack("<hh", 25, 50)
            verif_code = bytes([0xAA, 0xBB])
            datagram_bytes = header_bytes + payload + verif_code

            hex_str = datagram_bytes.hex()
            decoded = decode_datagram(hex_str)
            self.assertEqual(decoded.header.protocol_version, 3)
            self.assertEqual(decoded.header.not_used, 0)
            self.assertEqual(decoded.header.message_type, 1)
            self.assertEqual(decoded.header.payload_type, 0)
            self.assertEqual(decoded.header.sender_full, 0x1234)
            self.assertEqual(decoded.header.sid, 0x34)
            self.assertEqual(decoded.header.rid, 0x12)
            self.assertEqual(decoded.header.seq_num, 1)
            self.assertEqual(decoded.header.payload_len, 4)
            self.assertIsInstance(decoded.payload, dict)
            self.assertEqual(decoded.payload["temperature"], 25)
            self.assertEqual(decoded.payload["humidity"], 50)
            self.assertEqual(decoded.verif_code, verif_code)

        def test_unknown_payload(self):
            # Dla nieznanego typu payload (np. pt = 2) testujemy, że payload pozostaje surowymi bajtami.
            protocol_byte = 0x03  # PV=3, NU=0
            mt = 1
            pt = 2  # Nieznany typ – testerzy chcą, aby wartość logiczna była 2
            msg_desc_byte = ((pt & 0x1F) << 3) | (mt & 0x07)
            sender = struct.pack("<H", 0x1234)
            seq_num = struct.pack("<H", 1)
            payload_len = struct.pack("<H", 4)
            header_bytes = bytes([protocol_byte, msg_desc_byte]) + sender + seq_num + payload_len

            payload = bytes([0xDE, 0xAD, 0xBE, 0xEF])
            verif_code = bytes([0xAA, 0xBB])
            datagram_bytes = header_bytes + payload + verif_code

            hex_str = datagram_bytes.hex()
            decoded = decode_datagram(hex_str)
            # Teraz oczekujemy, że header.payload_type będzie równe 2 (zgodnie z naszą konwencją)
            self.assertEqual(decoded.header.payload_type, 2)
            self.assertEqual(decoded.payload, payload)

    unittest.main()
Editor is loading...
Leave a Comment