OSNAP Lib
unknown
python
a year ago
4.2 kB
2
Indexable
Never
import uuid import requests from enum import Enum import datetime class Scope(Enum): PUBLIC = "public" PRIVATE = "private" class MessageType(Enum): INFO = "info" TASK = "task" class OSNAPAgentTool: def __init__( self, name, description, invoke_endpoint, invoke_required_params, invoke_optional_params=None, ): self.tool_id = str(uuid.uuid4()) self.name = name self.description = description self.invoke_endpoint = invoke_endpoint self.invoke_required_params = invoke_required_params self.invoke_optional_params = invoke_optional_params if invoke_optional_params else {} def __str__(self): return f"Tool(name={self.name}, description={self.description}, tool_id={self.tool_id}, invoke_endpoint={self.invoke_endpoint}, invoke_required_params={self.invoke_required_params}, invoke_optional_params={self.invoke_optional_params})" def __repr__(self): return self.__str__() class OSNAPAgent: def __init__( self, name, description, scope, info_endpoint, tools, registry_url, task_list=None, task_store=None, ): self.id = str(uuid.uuid4()) self.name = name self.description = description self.scope = Scope(scope) self.info_endpoint = info_endpoint self.tools = tools self.registry_url = registry_url self.task_list = task_list if task_list else [] self.task_store = task_store if task_store else {} self.request_validation_handler = None self.response_validation_handler = None self.error_handling_handler = None self.task_processing_handler = None def create_OSNAP_request(self, target_agent_id, request_type, priority, request_metadata): timestamp = datetime.datetime.now().isoformat() request = { "requester_id": self.id, "target_agent_id": target_agent_id, "request_type": MessageType(request_type).value, "priority": priority, "request_metadata": request_metadata, "timestamp": timestamp } return request def create_OSNAP_response(self, request_id, status, response_data): timestamp = datetime.datetime.now().isoformat() response = { "request_id": request_id, "status": status, "response_data": response_data, "timestamp": timestamp } return response def create_OSNAP_error(self, request_id, error_code, error_description, additional_info=None): timestamp = datetime.datetime.now().isoformat() error = { "request_id": request_id, "error_code": error_code, "error_description": error_description, "additional_info": additional_info if additional_info else {}, "timestamp": timestamp } return error def send_request_to_agent(self, agent, request): response = requests.post(agent.info_endpoint, json=request) return response.json() def get_task_status(self, task_id, agent): # Your logic to get the status of a requested task from an agent pass def register(self): registration_data = { "id": self.id, "name": self.name, "description": self.description, "scope": self.scope.value, "info_endpoint": self.info_endpoint, "tools": [tool.__dict__ for tool in self.tools] } response = requests.post(f"{self.registry_url}/register", json=registration_data) return response.json() def update_tools_on_registry(self): update_data = { "id": self.id, "tools": [tool.__dict__ for tool in self.tools], } response = requests.put(f"{self.registry_url}/update-tools", json=update_data) return response.json() def unregister(self): unregister_data = { "id": self.id, } response = requests.post(f"{self.registry_url}/unregister", json=unregister_data) return response.json()