Untitled
unknown
plain_text
a year ago
19 kB
5
Indexable
import threading
import serial
import time
import json
from typing import Dict, List
from ureka_framework.logic.device_controller import DeviceController
import ureka_framework.model.data_model.this_device as this_device
import ureka_framework.model.message_model.u_ticket as u_ticket
import ureka_framework
import ureka_framework.environment as environment
import shutil
import os
import logging
import regex as re
from pprint import pprint
import inquirer
# Serial Port Setting
serial_port_name = '/dev/ttyp0'
baud_rate = 115200
timeout = 1
def expand_nested_json(data):
# 檢查每個鍵值對,嘗試解析內層 JSON 字串
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, str):
try:
# 嘗試將值解析為 JSON,若成功則展開
nested_json = json.loads(value)
data[key] = expand_nested_json(nested_json)
except json.JSONDecodeError:
# 若解析失敗,表示不是 JSON 字串,保留原值
pass
elif isinstance(value, dict) or isinstance(value, list):
# 若值是字典或清單,則遞迴處理
data[key] = expand_nested_json(value)
elif isinstance(data, list):
# 若資料是清單,則遞迴處理每個元素
data = [expand_nested_json(item) for item in data]
return data
def print_nested_json(json_message):
# 將輸入的 JSON 字串轉換為字典
data = json.loads(json_message)
# 展開所有嵌套的 JSON 字串
expanded_data = expand_nested_json(data)
# 以可讀格式輸出
print(json.dumps(expanded_data, indent=2, ensure_ascii=False))
# Open Serial
file_handler = logging.FileHandler(f'agents.log')
file_handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger = logging.getLogger('agents')
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
open(f'agents.log', 'w').close()
def dict_to_jsonstr(dict_obj: Dict[str, str]) -> str:
return json.dumps(dict_obj, sort_keys=True)
def process_parsed_json(parsed_json):
# Outer json
message_type = parsed_json.get('message_type', 'Unknown')
message_operation = parsed_json.get('message_operation', 'Unknown')
# Inner json
inner_json = {}
if 'message_json' in parsed_json:
inner_json = parsed_json['message_json']
elif 'message_str' in parsed_json:
try:
inner_json = json.loads(parsed_json['message_str'], strict=True)
except json.JSONDecodeError:
print("[Agent] Parsing json ERROR")
def unescape_json(json_str):
return json_str.replace('\\"', '"').replace("\\'", "'")
def parse_complex_log(log_data):
json_pattern = r'\{(?:[^{}]|(?R))*\}'
json_matches = re.findall(json_pattern, log_data, re.DOTALL)
last_json = ""
for json_str in json_matches:
try:
parsed_json = json.loads(json_str, strict=True)
# print(json.dumps(parsed_json, indent=2, ensure_ascii=False))
# Incase there is inner json
# if 'message_str' in parsed_json:
# try:
# message_json = json.loads(unescape_json(parsed_json['message_str']))
# parsed_json['message_str'] = message_json
# # print(json.dumps(message_json, indent=2, ensure_ascii=False))
# except json.JSONDecodeError as e:
# print(f"[Agent] Json parsing ERROR: {e}")
# print("+-------------------------------------------------------------+")
# print("\n[Agent] Original Json:")
# print(json.dumps(parsed_json, indent=2, ensure_ascii=False))
process_parsed_json(parsed_json)
if 'message_type' in parsed_json:
# print("哈", json_str)
last_json = json_str if parsed_json['message_type'] == "RTICKET" else last_json
except json.JSONDecodeError as e:
print(f"[Agent]Json parsing ERROR: {e}\n{json_str}")
if last_json == "":
# raise RuntimeError("No RTICKET received")
return None
else :
# print("last_json", last_json)
# dump last_json and print it out with indent
print_nested_json(last_json)
return last_json
class MySerial:
def __init__(self, port, baudrate, device_manager, timeout=1):
self.serial_port = serial.Serial(port=port, baudrate=baudrate, timeout=timeout)
self.device_manager = device_manager
self.running = False
def send_data(self, message):
print(f"[Agent] Sending: ")
print_nested_json(message)
self.serial_port.write(f"{message}\n".encode('utf-8'))
self.serial_port.flush()
# print("send_data", environment.my_ticket)
environment.initialize()
def start_reading(self):
"""開始執行緒來讀取 serial port 資料"""
self.running = True
self.thread = threading.Thread(target=self.read_from_serial)
self.thread.start()
def stop_reading(self):
"""停止讀取 serial port 並關閉執行緒"""
self.running = False
if self.thread.is_alive():
self.thread.join()
self.serial_port.close()
def process_data(self, data):
if self.device_manager.cummunicating_agent is None:
logging.error("No communicating agent")
return
logger.debug(f"[{self.device_manager.agents[self.device_manager.cummunicating_agent].shared_data.this_device.device_name}] Received data: {data}")
received_json = parse_complex_log(data)
if received_json is None:
return
environment.my_ticket = received_json
self.device_manager.agents[self.device_manager.cummunicating_agent].msg_receiver._recv_xxx_message()
if(environment.my_ticket):
self.send_data(environment.my_ticket)
else:
self.device_manager.cummunicating_agent = None
def read_from_serial(self):
"""在執行緒中不斷讀取 serial port 資料,並使用回呼函數處理資料"""
while self.running:
if self.serial_port.in_waiting: # 有資料時讀取
data = self.serial_port.read(15000).decode('utf-8', errors='ignore')
if data:
self.process_data(data) # 呼叫回呼函數處理資料
time.sleep(0.5) # 防止 CPU 使用過高
class DeviceManager:
def __init__(self):
self.agents: List[DeviceController] = []
self.cummunicating_agent: int = None
self.my_serial = None
self.types = ["init", "own", "saccess", "access"]
self.agent_name = ["manufacturer", "admin", "user1", "user2"]
self.remove_storage()
environment.initialize()
self.init_all_agents()
def remove_storage(self):
# Define the path to the SimpleStorage folder
# Get the current script directory
current_dir = os.path.dirname(os.path.abspath(__file__))
# Define the path to the SimpleStorage folder relative to the current directory
folder_path = os.path.join(current_dir, 'ureka_framework', 'resource', 'storage', 'SimpleStorage')
# Check if the folder exists
if os.path.exists(folder_path):
# Remove all contents of the SimpleStorage folder
for item in os.listdir(folder_path):
item_path = os.path.join(folder_path, item)
try:
if os.path.isfile(item_path):
os.remove(item_path)
logger.info(f"Removed file: {item_path}")
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
logger.info(f"Removed folder: {item_path}")
except OSError as e:
logger.error(f"Error removing {item_path}: {e}")
else:
logger.info(f"Folder does not exist: {folder_path}")
def init_all_agents(self):
# 0: cloud_server_dm, 1: user1, 2: user2, 3: user3, -1: iot_device
for i in range(4):
agent: DeviceController = DeviceController(
device_type=this_device.USER_AGENT_OR_CLOUD_SERVER,
device_name=f"Agent{i}",
)
agent.shared_data.this_device.ticket_order = 0
agent.executor._execute_one_time_intialize_agent_or_server()
self.agents.append(agent)
def check_params_issue(self, from_agent, to_agent, ticket_type):
if from_agent < 0 or from_agent > len(self.agents) - 1:
print("Invalid from_agent")
return False
if to_agent < 0 or to_agent > len(self.agents) - 1:
print("Invalid to_agent")
return False
if ticket_type not in self.types:
print("Invalid ticket_type")
return False
return True
def check_params_apply(self, agent):
if agent < 0 or agent > len(self.agents) - 1:
print("Invalid agent")
return False
return True
def generate_request(self, to_agent, ticket_type, device_id = "no_id"):
u_ticket_type = ""
if ticket_type == "init": u_ticket_type = u_ticket.TYPE_INITIALIZATION_UTICKET
elif ticket_type == "own": u_ticket_type = u_ticket.TYPE_OWNERSHIP_UTICKET
elif ticket_type == "saccess": u_ticket_type = u_ticket.TYPE_SELFACCESS_UTICKET
elif ticket_type == "access": u_ticket_type = u_ticket.TYPE_ACCESS_UTICKET
generated_request: dict = {
"device_id": f"{device_id}",
"holder_id": f"{to_agent.shared_data.this_person.person_pub_key_str}",
"u_ticket_type": f"{u_ticket_type}",
}
if "access" in ticket_type:
generated_task_scope = dict_to_jsonstr({"ALL": "allow"})
generated_request["task_scope"] = f"{generated_task_scope}"
return generated_request
def issue(self, from_agent: int, to_agent: int, ticket_type: str):
# check params
if not self.check_params_issue(from_agent, to_agent, ticket_type):
return -1
# Get the device_id only if device_table is not empty
device_keys = list(self.agents[from_agent].shared_data.device_table.keys())
request = self.generate_request(self.agents[to_agent], ticket_type, device_keys[-1] if device_keys else "no_id")
logger.debug(f"Request: {request}")
if from_agent == to_agent:
self.agents[from_agent].flow_issuer_issue_u_ticket.issuer_issue_u_ticket_to_herself(
device_id=request["device_id"], arbitrary_dict=request
)
else:
self.agents[from_agent].flow_issuer_issue_u_ticket.issuer_issue_u_ticket_to_holder(
device_id=request["device_id"], arbitrary_dict=request
)
self.agents[to_agent].msg_receiver._recv_xxx_message()
def apply(self, agent: int, command="HELLO-1"):
# check params
if not self.check_params_apply(agent):
return -1
# check if agent have any unuse ticket
unuse_ticket_list = [key for key in self.agents[agent].shared_data.device_table]
# logger.debug(f"Unuse ticket: {unuse_ticket_list}")
if not unuse_ticket_list:
logger.error("No unuse ticket")
print("No unuse ticket")
return -1
environment.initialize()
if command == "":
command = "HELLO-1"
self.agents[agent].flow_apply_u_ticket.holder_apply_u_ticket(unuse_ticket_list[0], command)
self.cummunicating_agent = agent
self.my_serial.send_data(environment.my_ticket)
start_time = time.time()
while True:
# wait for cummunicating agent to finish
if self.cummunicating_agent is None:
break
# if wait too long, raise error
print("Waiting for cummunicating agent ", time.time() - start_time)
if time.time() - start_time > 10:
logger.error("Cumminicating agent timeout")
print("Cumminicating agent timeout")
break
time.sleep(1)
def token(self, agent: int, command="HELLO-1"):
if not self.check_params_apply(agent):
return -1
# check if agent have any unuse ticket
unuse_ticket_list = [key for key in self.agents[agent].shared_data.device_table]
# logger.debug(f"Unuse ticket: {unuse_ticket_list}")
if not unuse_ticket_list:
logger.error("No unuse ticket")
print("No unuse ticket")
return -1
environment.initialize()
self.agents[agent].flow_issue_u_token.holder_send_cmd(unuse_ticket_list[0], command, ("ACCESS_END" in command))
self.cummunicating_agent = agent
self.my_serial.send_data(environment.my_ticket)
start_time = time.time()
while True:
# wait for cummunicating agent to finish
if self.cummunicating_agent is None:
break
# if wait too long, raise error
print("Waiting for cummunicating agent ", time.time() - start_time)
if time.time() - start_time > 10:
logger.error("Cumminicating agent timeout")
print("Cumminicating agent timeout")
break
time.sleep(1)
def main():
my_manager = DeviceManager()
my_serial = MySerial(serial_port_name, baud_rate, my_manager)
my_manager.my_serial = my_serial
my_serial.start_reading()
try:
while True:
questions = [
inquirer.List('action',
message="What do you want to do?",
choices=["init", "own", "config", "vote1", "vote2", "quit"],
),
]
action = inquirer.prompt(questions)["action"]
if action == "init":
my_manager.issue(0, 0, "init")
my_manager.apply(0, "")
print("Init done")
elif action == "own":
my_manager.issue(0, 1, "own")
my_manager.apply(1, "")
print("Own done")
elif action == "config":
my_manager.issue(1, 1, "saccess")
my_manager.apply(1, "")
for i in range(2):
questions = [
inquirer.Text('command',
message=f"Input command for candidate{i+1}",
),
]
my_manager.token(1, f"C:{inquirer.prompt(questions)["command"]}")
print("Config candidate done")
for i in range([2,3]):
# questions = [
# inquirer.Text('command',
# message=f"Input command for voter{i+1}",
# ),
# ]
my_manager.token(1, f"C-{my_manager.agent_name[i].shared_data.this_person.person_pub_key_str}")
my_manager.token(1, "ACCESS_END_C")
print("Config voter done")
elif action == "vote1":
my_manager.issue(1, 2, "access")
my_manager.apply(2, "")
questions = [
inquirer.Text('command',
message=f"Input command for voter1",
),
]
my_manager.token(2, f"A")
my_manager.token(2, f"V:0")
print("Vote1 done")
finally:
my_serial.stop_reading()
print("Serial port closed, susppended")
# try :
# while True:
# name_list = ["manufacturer", "admin", "user1", "user2"]
# questions = [
# inquirer.List('from_agent',
# message="From which agent?",
# choices=[f"{i}: {name_list[i]}" for i in range(len(my_manager.agents))],
# ),
# inquirer.List('action',
# message="What do you want to do?",
# choices=["issue", "apply", "token", "access end", "quit"],
# ),
# ]
# answers = inquirer.prompt(questions)
# from_agent = int(answers["from_agent"][0])
# action = answers["action"]
# if action == "quit":
# break
# elif action == "issue":
# questions = [
# inquirer.List('to_agent',
# message="To which agent?",
# choices=[f"{i}: {name_list[i]}" for i in range(len(my_manager.agents))],
# ),
# inquirer.List('ticket_type',
# message="Which ticket type?",
# choices=my_manager.types,
# ),
# ]
# answers = inquirer.prompt(questions)
# to_agent = int(answers["to_agent"][0])
# ticket_type = answers["ticket_type"]
# my_manager.issue(from_agent, to_agent, ticket_type)
# elif action == "apply":
# questions = [
# inquirer.Text('command',
# message="Input command",
# ),
# ]
# answers = inquirer.prompt(questions)
# command = answers["command"]
# my_manager.apply(from_agent, command)
# elif action == "token":
# questions = [
# inquirer.Text('command',
# message="Input command",
# ),
# ]
# answers = inquirer.prompt(questions)
# command = answers["command"]
# my_manager.token(from_agent, command)
# elif action == "access end":
# my_manager.token(from_agent, "ACCESS_END")
# # else:
# # print("Invalid input, Retype。")
# # my_manager.issue(0, 0, "init")
# # my_manager.apply(0, "")
# finally:
# my_serial.stop_reading()
# print("Serial port closed, susppended")
if __name__ == "__main__":
main()
Editor is loading...
Leave a Comment