Untitled

 avatar
unknown
plain_text
5 months ago
19 kB
3
Indexable
import threading
import serial
import time
import json
from typing import Dict, List
from ureka_framework.logic.device_controller import DeviceController
import ureka_framework.model.data_model.this_device as this_device
import ureka_framework.model.message_model.u_ticket as u_ticket
import ureka_framework
import ureka_framework.environment as environment
import shutil
import os
import logging
import regex as re
from pprint import pprint
import inquirer


# Serial Port Setting
serial_port_name = '/dev/ttyp0'
baud_rate = 115200
timeout = 1

def expand_nested_json(data):
    # 檢查每個鍵值對,嘗試解析內層 JSON 字串
    if isinstance(data, dict):
        for key, value in data.items():
            if isinstance(value, str):
                try:
                    # 嘗試將值解析為 JSON,若成功則展開
                    nested_json = json.loads(value)
                    data[key] = expand_nested_json(nested_json)
                except json.JSONDecodeError:
                    # 若解析失敗,表示不是 JSON 字串,保留原值
                    pass
            elif isinstance(value, dict) or isinstance(value, list):
                # 若值是字典或清單,則遞迴處理
                data[key] = expand_nested_json(value)
    elif isinstance(data, list):
        # 若資料是清單,則遞迴處理每個元素
        data = [expand_nested_json(item) for item in data]

    return data

def print_nested_json(json_message):
    # 將輸入的 JSON 字串轉換為字典
    data = json.loads(json_message)
    # 展開所有嵌套的 JSON 字串
    expanded_data = expand_nested_json(data)
    # 以可讀格式輸出
    print(json.dumps(expanded_data, indent=2, ensure_ascii=False))

# Open Serial
file_handler = logging.FileHandler(f'agents.log')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger = logging.getLogger('agents')
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
open(f'agents.log', 'w').close()
        

def dict_to_jsonstr(dict_obj: Dict[str, str]) -> str:
    return json.dumps(dict_obj, sort_keys=True)

def process_parsed_json(parsed_json):
    
    # Outer json
    message_type = parsed_json.get('message_type', 'Unknown')
    message_operation = parsed_json.get('message_operation', 'Unknown')
    # Inner json
    inner_json = {}
    if 'message_json' in parsed_json:
        inner_json = parsed_json['message_json']
    elif 'message_str' in parsed_json:
        try:
            inner_json = json.loads(parsed_json['message_str'], strict=True)
        except json.JSONDecodeError:
            print("[Agent] Parsing json ERROR")
    

def unescape_json(json_str):
    return json_str.replace('\\"', '"').replace("\\'", "'")

def parse_complex_log(log_data):
    json_pattern = r'\{(?:[^{}]|(?R))*\}'
    
    json_matches = re.findall(json_pattern, log_data, re.DOTALL)

    last_json = ""
    
    for json_str in json_matches:
        try:
            parsed_json = json.loads(json_str, strict=True)
            # print(json.dumps(parsed_json, indent=2, ensure_ascii=False))
            
            # Incase there is inner json
            # if 'message_str' in parsed_json:
            #     try:
            #         message_json = json.loads(unescape_json(parsed_json['message_str']))
            #         parsed_json['message_str'] = message_json
            #         # print(json.dumps(message_json, indent=2, ensure_ascii=False))
            #     except json.JSONDecodeError as e:
            #         print(f"[Agent] Json parsing ERROR: {e}")
            
            # print("+-------------------------------------------------------------+")
            # print("\n[Agent] Original Json:")
            # print(json.dumps(parsed_json, indent=2, ensure_ascii=False))
            process_parsed_json(parsed_json)
            if 'message_type' in parsed_json:
                # print("哈", json_str)
                last_json = json_str if parsed_json['message_type'] == "RTICKET" else last_json
        except json.JSONDecodeError as e:
            print(f"[Agent]Json parsing ERROR: {e}\n{json_str}")
    if last_json == "":
        # raise RuntimeError("No RTICKET received")
        return None
    else :
        # print("last_json", last_json)
        # dump last_json and print it out with indent
        print_nested_json(last_json)
    return last_json

class MySerial:
    def __init__(self, port, baudrate, device_manager, timeout=1):
        self.serial_port = serial.Serial(port=port, baudrate=baudrate, timeout=timeout)
        self.device_manager = device_manager
        self.running = False
    def send_data(self, message):
        print(f"[Agent] Sending: ")
        print_nested_json(message)
        self.serial_port.write(f"{message}\n".encode('utf-8'))
        self.serial_port.flush()

        # print("send_data", environment.my_ticket)
        environment.initialize()

    def start_reading(self):
        """開始執行緒來讀取 serial port 資料"""
        self.running = True
        self.thread = threading.Thread(target=self.read_from_serial)
        self.thread.start()

    def stop_reading(self):
        """停止讀取 serial port 並關閉執行緒"""
        self.running = False
        if self.thread.is_alive():
            self.thread.join()
        self.serial_port.close()
    def process_data(self, data):
        if self.device_manager.cummunicating_agent is None:
            logging.error("No communicating agent")
            return
        logger.debug(f"[{self.device_manager.agents[self.device_manager.cummunicating_agent].shared_data.this_device.device_name}] Received data: {data}")
        received_json = parse_complex_log(data)
        if received_json is None:
            return
        environment.my_ticket = received_json
        self.device_manager.agents[self.device_manager.cummunicating_agent].msg_receiver._recv_xxx_message()
        if(environment.my_ticket):
            self.send_data(environment.my_ticket)
        else:
            self.device_manager.cummunicating_agent = None



    def read_from_serial(self):
        """在執行緒中不斷讀取 serial port 資料,並使用回呼函數處理資料"""
        while self.running:
            if self.serial_port.in_waiting:  # 有資料時讀取
                data = self.serial_port.read(15000).decode('utf-8', errors='ignore')
                if data:
                    self.process_data(data)  # 呼叫回呼函數處理資料
            time.sleep(0.5)  # 防止 CPU 使用過高

    
class DeviceManager:
    def __init__(self):
        self.agents: List[DeviceController] = []
        self.cummunicating_agent: int = None
        self.my_serial = None
        self.types = ["init", "own", "saccess", "access"]
        self.agent_name = ["manufacturer", "admin", "user1", "user2"]




        
        self.remove_storage()
        environment.initialize()
        self.init_all_agents()
    def remove_storage(self):
        # Define the path to the SimpleStorage folder
        # Get the current script directory
        current_dir = os.path.dirname(os.path.abspath(__file__))

        # Define the path to the SimpleStorage folder relative to the current directory
        folder_path = os.path.join(current_dir, 'ureka_framework', 'resource', 'storage', 'SimpleStorage')
        # Check if the folder exists
        if os.path.exists(folder_path):
            # Remove all contents of the SimpleStorage folder
            for item in os.listdir(folder_path):
                item_path = os.path.join(folder_path, item)
                try:
                    if os.path.isfile(item_path):
                        os.remove(item_path)
                        logger.info(f"Removed file: {item_path}")
                    elif os.path.isdir(item_path):
                        shutil.rmtree(item_path)
                        logger.info(f"Removed folder: {item_path}")
                except OSError as e:
                    logger.error(f"Error removing {item_path}: {e}")
        else:
            logger.info(f"Folder does not exist: {folder_path}")
    def init_all_agents(self):
        # 0: cloud_server_dm, 1: user1, 2: user2, 3: user3, -1: iot_device
        for i in range(4):
            agent: DeviceController = DeviceController(
                device_type=this_device.USER_AGENT_OR_CLOUD_SERVER,
                device_name=f"Agent{i}",
            )
            agent.shared_data.this_device.ticket_order = 0
            agent.executor._execute_one_time_intialize_agent_or_server()
            self.agents.append(agent)
    def check_params_issue(self, from_agent, to_agent, ticket_type):
        if from_agent < 0 or from_agent > len(self.agents) - 1:
            print("Invalid from_agent")
            return False
        if to_agent < 0 or to_agent > len(self.agents) - 1:
            print("Invalid to_agent")
            return False
        if ticket_type not in self.types:
            print("Invalid ticket_type")
            return False
        return True
    def check_params_apply(self, agent):
        if agent < 0 or agent > len(self.agents) - 1:
            print("Invalid agent")
            return False
        return True

    def generate_request(self, to_agent, ticket_type, device_id = "no_id"):
        u_ticket_type = ""
        if ticket_type == "init": u_ticket_type = u_ticket.TYPE_INITIALIZATION_UTICKET
        elif ticket_type == "own": u_ticket_type = u_ticket.TYPE_OWNERSHIP_UTICKET
        elif ticket_type == "saccess": u_ticket_type = u_ticket.TYPE_SELFACCESS_UTICKET
        elif ticket_type == "access": u_ticket_type = u_ticket.TYPE_ACCESS_UTICKET
        generated_request: dict = {
            "device_id": f"{device_id}",
            "holder_id": f"{to_agent.shared_data.this_person.person_pub_key_str}",
            "u_ticket_type": f"{u_ticket_type}",
        }
        if "access" in ticket_type:
            generated_task_scope = dict_to_jsonstr({"ALL": "allow"})
            generated_request["task_scope"] = f"{generated_task_scope}"

        return generated_request

    def issue(self, from_agent: int, to_agent: int, ticket_type: str):
        # check params
        if not self.check_params_issue(from_agent, to_agent, ticket_type):
            return -1
        # Get the device_id only if device_table is not empty
        device_keys = list(self.agents[from_agent].shared_data.device_table.keys())
        request = self.generate_request(self.agents[to_agent], ticket_type, device_keys[-1] if device_keys else "no_id")
        logger.debug(f"Request: {request}")
        if from_agent == to_agent:
            self.agents[from_agent].flow_issuer_issue_u_ticket.issuer_issue_u_ticket_to_herself(
                device_id=request["device_id"], arbitrary_dict=request
            )
        else:
            self.agents[from_agent].flow_issuer_issue_u_ticket.issuer_issue_u_ticket_to_holder(
                device_id=request["device_id"], arbitrary_dict=request
            )
            self.agents[to_agent].msg_receiver._recv_xxx_message()
    def apply(self, agent: int, command="HELLO-1"):
        # check params
        if not self.check_params_apply(agent):
            return -1
        # check if agent have any unuse ticket
        unuse_ticket_list = [key for key in self.agents[agent].shared_data.device_table]
        # logger.debug(f"Unuse ticket: {unuse_ticket_list}")
        if not unuse_ticket_list:
            logger.error("No unuse ticket")
            print("No unuse ticket")
            return -1
        
        environment.initialize()
        if command == "":
            command = "HELLO-1"
        self.agents[agent].flow_apply_u_ticket.holder_apply_u_ticket(unuse_ticket_list[0], command)
        self.cummunicating_agent = agent
        self.my_serial.send_data(environment.my_ticket)


        start_time = time.time()
        while True:
            # wait for cummunicating agent to finish
            if self.cummunicating_agent is None:
                break
            # if wait too long, raise error
            print("Waiting for cummunicating agent ", time.time() - start_time)
            if time.time() - start_time > 10:
                logger.error("Cumminicating agent timeout")
                print("Cumminicating agent timeout")
                break
            time.sleep(1)
    def token(self, agent: int, command="HELLO-1"):
        if not self.check_params_apply(agent):
            return -1
        # check if agent have any unuse ticket
        unuse_ticket_list = [key for key in self.agents[agent].shared_data.device_table]
        # logger.debug(f"Unuse ticket: {unuse_ticket_list}")
        if not unuse_ticket_list:
            logger.error("No unuse ticket")
            print("No unuse ticket")
            return -1
        
        environment.initialize()
        self.agents[agent].flow_issue_u_token.holder_send_cmd(unuse_ticket_list[0], command, ("ACCESS_END" in command))

        self.cummunicating_agent = agent
        self.my_serial.send_data(environment.my_ticket)
        start_time = time.time()
        while True:
            # wait for cummunicating agent to finish
            if self.cummunicating_agent is None:
                break
            # if wait too long, raise error
            print("Waiting for cummunicating agent ", time.time() - start_time)
            if time.time() - start_time > 10:
                logger.error("Cumminicating agent timeout")
                print("Cumminicating agent timeout")
                break
            time.sleep(1)
            



def main():
    my_manager = DeviceManager()
    my_serial = MySerial(serial_port_name, baud_rate, my_manager)
    my_manager.my_serial = my_serial
    my_serial.start_reading()
    try:
        while True:
            questions = [
                inquirer.List('action',
                    message="What do you want to do?",
                    choices=["init", "own", "config", "vote1", "vote2", "quit"],
                ),
            ]
            action = inquirer.prompt(questions)["action"]
            if action == "init":
                my_manager.issue(0, 0, "init")
                my_manager.apply(0, "")
                print("Init done")
            elif action == "own":
                my_manager.issue(0, 1, "own")
                my_manager.apply(1, "")
                print("Own done")
            elif action == "config":
                my_manager.issue(1, 1, "saccess")
                my_manager.apply(1, "")
                for i in range(2):
                    questions = [
                        inquirer.Text('command',
                            message=f"Input command for candidate{i+1}",
                        ),
                    ]
                    my_manager.token(1, f"C:{inquirer.prompt(questions)["command"]}")
                print("Config candidate done")
                for i in range([2,3]):
                    # questions = [
                    #     inquirer.Text('command',
                    #         message=f"Input command for voter{i+1}",
                    #     ),
                    # ]
                    my_manager.token(1, f"C-{my_manager.agent_name[i].shared_data.this_person.person_pub_key_str}")
                my_manager.token(1, "ACCESS_END_C")
                print("Config voter done")
            elif action == "vote1":
                my_manager.issue(1, 2, "access")
                my_manager.apply(2, "")
                questions = [
                    inquirer.Text('command',
                        message=f"Input command for voter1",
                    ),
                ]
                my_manager.token(2, f"A")
                my_manager.token(2, f"V:0")
                print("Vote1 done")

                

    finally:
        my_serial.stop_reading()
        print("Serial port closed, susppended")

    # try :
    #     while True:
    #         name_list = ["manufacturer", "admin", "user1", "user2"]
    #         questions = [
    #             inquirer.List('from_agent',
    #                 message="From which agent?",
    #                 choices=[f"{i}: {name_list[i]}" for i in range(len(my_manager.agents))],
    #             ),
    #             inquirer.List('action',
    #                 message="What do you want to do?",
    #                 choices=["issue", "apply", "token", "access end", "quit"],
    #             ),
    #         ]
    #         answers = inquirer.prompt(questions)
    #         from_agent = int(answers["from_agent"][0])
    #         action = answers["action"]

    #         if action == "quit":
    #             break
    #         elif action == "issue":
    #             questions = [
                    
    #                 inquirer.List('to_agent',
    #                     message="To which agent?",
    #                     choices=[f"{i}: {name_list[i]}" for i in range(len(my_manager.agents))],
    #                 ),
    #                 inquirer.List('ticket_type',
    #                     message="Which ticket type?",
    #                     choices=my_manager.types,
    #                 ),
    #             ]
    #             answers = inquirer.prompt(questions)
                
    #             to_agent = int(answers["to_agent"][0])
    #             ticket_type = answers["ticket_type"]
    #             my_manager.issue(from_agent, to_agent, ticket_type)
    #         elif action == "apply":
    #             questions = [
    #                 inquirer.Text('command',
    #                     message="Input command",
    #                 ),
    #             ]
    #             answers = inquirer.prompt(questions)
    #             command = answers["command"]
    #             my_manager.apply(from_agent, command)
    #         elif action == "token":
    #             questions = [
    #                 inquirer.Text('command',
    #                     message="Input command",
    #                 ),
    #             ]
    #             answers = inquirer.prompt(questions)
    #             command = answers["command"]
    #             my_manager.token(from_agent, command)
    #         elif action == "access end":
    #             my_manager.token(from_agent, "ACCESS_END")
    #         # else:
    #         #         print("Invalid input, Retype。")
    #         # my_manager.issue(0, 0, "init")
    #         # my_manager.apply(0, "")
    # finally:
    #     my_serial.stop_reading()
    #     print("Serial port closed, susppended")
    
        
    

if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment