OSNAP Lib
unknown
python
3 years ago
4.2 kB
8
Indexable
import uuid
import requests
from enum import Enum
import datetime
class Scope(Enum):
PUBLIC = "public"
PRIVATE = "private"
class MessageType(Enum):
INFO = "info"
TASK = "task"
class OSNAPAgentTool:
def __init__(
self,
name,
description,
invoke_endpoint,
invoke_required_params,
invoke_optional_params=None,
):
self.tool_id = str(uuid.uuid4())
self.name = name
self.description = description
self.invoke_endpoint = invoke_endpoint
self.invoke_required_params = invoke_required_params
self.invoke_optional_params = invoke_optional_params if invoke_optional_params else {}
def __str__(self):
return f"Tool(name={self.name}, description={self.description}, tool_id={self.tool_id}, invoke_endpoint={self.invoke_endpoint}, invoke_required_params={self.invoke_required_params}, invoke_optional_params={self.invoke_optional_params})"
def __repr__(self):
return self.__str__()
class OSNAPAgent:
def __init__(
self,
name,
description,
scope,
info_endpoint,
tools,
registry_url,
task_list=None,
task_store=None,
):
self.id = str(uuid.uuid4())
self.name = name
self.description = description
self.scope = Scope(scope)
self.info_endpoint = info_endpoint
self.tools = tools
self.registry_url = registry_url
self.task_list = task_list if task_list else []
self.task_store = task_store if task_store else {}
self.request_validation_handler = None
self.response_validation_handler = None
self.error_handling_handler = None
self.task_processing_handler = None
def create_OSNAP_request(self, target_agent_id, request_type, priority, request_metadata):
timestamp = datetime.datetime.now().isoformat()
request = {
"requester_id": self.id,
"target_agent_id": target_agent_id,
"request_type": MessageType(request_type).value,
"priority": priority,
"request_metadata": request_metadata,
"timestamp": timestamp
}
return request
def create_OSNAP_response(self, request_id, status, response_data):
timestamp = datetime.datetime.now().isoformat()
response = {
"request_id": request_id,
"status": status,
"response_data": response_data,
"timestamp": timestamp
}
return response
def create_OSNAP_error(self, request_id, error_code, error_description, additional_info=None):
timestamp = datetime.datetime.now().isoformat()
error = {
"request_id": request_id,
"error_code": error_code,
"error_description": error_description,
"additional_info": additional_info if additional_info else {},
"timestamp": timestamp
}
return error
def send_request_to_agent(self, agent, request):
response = requests.post(agent.info_endpoint, json=request)
return response.json()
def get_task_status(self, task_id, agent):
# Your logic to get the status of a requested task from an agent
pass
def register(self):
registration_data = {
"id": self.id,
"name": self.name,
"description": self.description,
"scope": self.scope.value,
"info_endpoint": self.info_endpoint,
"tools": [tool.__dict__ for tool in self.tools]
}
response = requests.post(f"{self.registry_url}/register", json=registration_data)
return response.json()
def update_tools_on_registry(self):
update_data = {
"id": self.id,
"tools": [tool.__dict__ for tool in self.tools],
}
response = requests.put(f"{self.registry_url}/update-tools", json=update_data)
return response.json()
def unregister(self):
unregister_data = {
"id": self.id,
}
response = requests.post(f"{self.registry_url}/unregister", json=unregister_data)
return response.json()
Editor is loading...