Untitled
unknown
plain_text
2 months ago
8.0 kB
3
Indexable
import struct from dataclasses import dataclass from enum import Enum, IntFlag from typing import Any, Callable, Dict from construct import BitStruct, BitsInteger, Struct, Int16ul, Bytes, IfThenElse, this # ----------------------------------------------------------------------------- # Enumy i flagi # ----------------------------------------------------------------------------- class MessageType(Enum): FT_ACK = 0 # Potwierdzenie FT_NACK = 1 # Negatywne potwierdzenie # …dodaj inne typy class PayloadType(Enum): SENSOR_STATUS = 0 # Pojedynczy rekord – np. stan czujnika SENSOR_SERIES = 1 # Tablica rekordów MIXED_RECORDS = 2 # Lista rekordów różnych typów # …dodaj inne typy class ExampleFlags(IntFlag): FLAG_A = 1 FLAG_B = 2 FLAG_C = 4 FLAG_D = 8 # ----------------------------------------------------------------------------- # Struktury danych # ----------------------------------------------------------------------------- @dataclass class Header: protocol_version: int # PV (3 bity) not_used: int # NU (5 bitów – powinny być zerami) message_type: int # MT (3 bity) payload_type: int # PT (5 bitów) sender_full: int # 16-bitowy identyfikator (SID+RID) seq_num: int # Numer sekwencyjny payload_len: int # Długość payload / Extended Info @property def sid(self) -> int: return self.sender_full & 0xFF @property def rid(self) -> int: return (self.sender_full >> 8) & 0xFF @dataclass class Datagram: header: Header payload: Any verif_code: bytes # ----------------------------------------------------------------------------- # Rejestracja dekoderów payload # ----------------------------------------------------------------------------- PAYLOAD_DECODERS: Dict[int, Callable[[bytes], Any]] = {} def register_payload_decoder(payload_type: int, decoder_func: Callable[[bytes], Any]) -> None: PAYLOAD_DECODERS[payload_type] = decoder_func def decode_payload(payload_bytes: bytes, payload_type: int) -> Any: if payload_type in PAYLOAD_DECODERS: return PAYLOAD_DECODERS[payload_type](payload_bytes) return payload_bytes # ----------------------------------------------------------------------------- # Definicje struktur Construct # ----------------------------------------------------------------------------- # Pole PROTOCOL – 1 bajt: 3 bity PV, 5 bitów NU protocol_struct = BitStruct( "pv" / BitsInteger(3), "nu" / BitsInteger(5) ) # UWAGA: W naszym protokole najniższe 3 bity (LSB) to MT, a pozostałe 5 bitów to PT. # Construct domyślnie operuje w kolejności MSB-first, dlatego zamieniamy kolejność: message_desc_struct = BitStruct( "pt" / BitsInteger(5), "mt" / BitsInteger(3) ) # Nagłówek – 8 bajtów header_struct = Struct( "protocol" / protocol_struct, "message_desc" / message_desc_struct, "sender" / Int16ul, "seq_num" / Int16ul, "payload_len" / Int16ul ) # Cały datagram – nagłówek, payload o dynamicznej długości, verification code zależny od długości payload datagram_struct = Struct( "header" / header_struct, "payload" / Bytes(this.header.payload_len), "verif_code" / IfThenElse(this.header.payload_len <= 16, Bytes(2), Bytes(4)) ) # ----------------------------------------------------------------------------- # Funkcje dekodujące # ----------------------------------------------------------------------------- def parse_datagram(data: bytes) -> Datagram: parsed = datagram_struct.parse(data) hdr = parsed.header header_obj = Header( protocol_version=hdr.protocol.pv, not_used=hdr.protocol.nu, message_type=hdr.message_desc.mt, payload_type=hdr.message_desc.pt, sender_full=hdr.sender, seq_num=hdr.seq_num, payload_len=hdr.payload_len ) decoded_payload = decode_payload(parsed.payload, hdr.message_desc.pt) return Datagram(header=header_obj, payload=decoded_payload, verif_code=parsed.verif_code) def decode_datagram(hex_str: str) -> Datagram: try: data = bytes.fromhex(hex_str) except ValueError as e: raise ValueError("Nieprawidłowy ciąg szesnastkowy.") from e return parse_datagram(data) # ----------------------------------------------------------------------------- # Przykładowy dekoder rekordu SENSOR_STATUS dla PayloadType SENSOR_STATUS (wartość 0) # ----------------------------------------------------------------------------- def decode_sensor_status(payload_bytes: bytes) -> dict: if len(payload_bytes) != 4: raise ValueError("Nieprawidłowa długość rekordu SENSOR_STATUS, oczekiwano 4 bajtów.") temperature, humidity = struct.unpack("<hh", payload_bytes) return {"temperature": temperature, "humidity": humidity} register_payload_decoder(PayloadType.SENSOR_STATUS.value, decode_sensor_status) # ----------------------------------------------------------------------------- # Testy jednostkowe (dla celów developerskich) # ----------------------------------------------------------------------------- if __name__ == "__main__": import unittest class TestDatagramDecoder(unittest.TestCase): def test_sensor_status_decoding(self): # Przygotowujemy datagram z SENSOR_STATUS protocol_byte = 0x03 # PV=3, NU=0 mt = 1 pt = 0 # SENSOR_STATUS # MESSAGE DESC: budujemy jako (pt << 3) | mt msg_desc_byte = ((pt & 0x1F) << 3) | (mt & 0x07) sender = struct.pack("<H", 0x1234) seq_num = struct.pack("<H", 1) payload_len = struct.pack("<H", 4) header_bytes = bytes([protocol_byte, msg_desc_byte]) + sender + seq_num + payload_len payload = struct.pack("<hh", 25, 50) verif_code = bytes([0xAA, 0xBB]) datagram_bytes = header_bytes + payload + verif_code hex_str = datagram_bytes.hex() decoded = decode_datagram(hex_str) self.assertEqual(decoded.header.protocol_version, 3) self.assertEqual(decoded.header.not_used, 0) self.assertEqual(decoded.header.message_type, 1) self.assertEqual(decoded.header.payload_type, 0) self.assertEqual(decoded.header.sender_full, 0x1234) self.assertEqual(decoded.header.sid, 0x34) self.assertEqual(decoded.header.rid, 0x12) self.assertEqual(decoded.header.seq_num, 1) self.assertEqual(decoded.header.payload_len, 4) self.assertIsInstance(decoded.payload, dict) self.assertEqual(decoded.payload["temperature"], 25) self.assertEqual(decoded.payload["humidity"], 50) self.assertEqual(decoded.verif_code, verif_code) def test_unknown_payload(self): # Dla nieznanego typu payload (np. pt = 2) testujemy, że payload pozostaje surowymi bajtami. protocol_byte = 0x03 # PV=3, NU=0 mt = 1 pt = 2 # Nieznany typ – testerzy chcą, aby wartość logiczna była 2 msg_desc_byte = ((pt & 0x1F) << 3) | (mt & 0x07) sender = struct.pack("<H", 0x1234) seq_num = struct.pack("<H", 1) payload_len = struct.pack("<H", 4) header_bytes = bytes([protocol_byte, msg_desc_byte]) + sender + seq_num + payload_len payload = bytes([0xDE, 0xAD, 0xBE, 0xEF]) verif_code = bytes([0xAA, 0xBB]) datagram_bytes = header_bytes + payload + verif_code hex_str = datagram_bytes.hex() decoded = decode_datagram(hex_str) # Teraz oczekujemy, że header.payload_type będzie równe 2 (zgodnie z naszą konwencją) self.assertEqual(decoded.header.payload_type, 2) self.assertEqual(decoded.payload, payload) unittest.main()
Editor is loading...
Leave a Comment