import os
import json
import enum
import time
import uuid
import requests
from typing import Callable, Dict, Union
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat, PrivateFormat, NoEncryption
class Scope(enum.Enum):
PUBLIC = "public"
PRIVATE = "private"
class SignatureUtil:
@staticmethod
def generate_key_pair():
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
private_pem = private_key.private_bytes(Encoding.PEM, PrivateFormat.PKCS8, NoEncryption())
public_pem = private_key.public_key().public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
return private_pem, public_pem
@staticmethod
def sign_data(private_key_pem, data):
private_key = serialization.load_pem_private_key(private_key_pem, None)
signature = private_key.sign(data.encode(), padding.PKCS1v15(), hashes.SHA256())
return signature
@staticmethod
def verify_signature(public_key_pem, data, signature):
public_key = serialization.load_pem_public_key(public_key_pem)
try:
public_key.verify(signature, data.encode(), padding.PKCS1v15(), hashes.SHA256())
return True
except Exception as e:
return False
class OSNAPAgent:
def __init__(self, name: str, description: str, scope: Scope, info_endpoint: str, registry_url: str, handlers: Dict[str, Callable]):
self.id = str(uuid.uuid4())
self.name = name
self.description = description
self.scope = scope
self.info_endpoint = info_endpoint
self.registry_url = registry_url
self.handlers = handlers
# Generate a key pair
self.private_key_pem, self.public_key_pem = SignatureUtil.generate_key_pair()
def register(self) -> None:
data = {
"agent_id": self.id,
"name": self.name,
"description": self.description,
"scope": self.scope.value,
"info_endpoint": self.info_endpoint,
"public_key": self.public_key_pem
}
response = requests.post(f"{self.registry_url}/register", json=data)
if response.status_code == 200:
print("Agent registered successfully.")
else:
print("Failed to register agent:", response.text)
def unregister(self) -> None:
data = {
"agent_id": self.id,
}
response = requests.post(f"{self.registry_url}/unregister", json=data)
if response.status_code == 200:
print("Agent unregistered successfully.")
else:
print("Failed to unregister agent:", response.text)
def update_tools_on_registry(self) -> None:
data = {
"agent_id": self.id,
"tools": self.handlers['get_tools']()
}
response = requests.post(f"{self.registry_url}/update_tools", json=data)
if response.status_code == 200:
print("Agent tools updated successfully.")
else:
print("Failed to update agent tools:", response.text)
def send_request_to_agent(self, destination_agent: 'OSNAPAgent', request: OSNAPRequest) -> Union[OSNAPResponse, OSNAPError]:
if self.handlers['request_validation'](request) and \
SignatureUtil.verify_signature(destination_agent.public_key_p_key_pem, request.payload, request.signature):
response = destination_agent.process_request(request)
if self.handlers['response_validation'](response) and \
SignatureUtil.verify_signature(destination_agent.public_key_pem, response.payload, response.signature):
return response
else:
return OSNAPError("Invalid response signature")
else:
return OSNAPError("Invalid request signature")
def process_request(self, request: OSNAPRequest) -> Union[OSNAPResponse, OSNAPError]:
if request.request_type == "info":
handler = self.handlers.get('info')
if handler:
return handler(request)
else:
return OSNAPError("Info handler not found")
elif request.request_type == "task":
handler = self.handlers.get(request.task_name)
if handler:
return handler(request)
else:
return OSNAPError("Task handler not found")
else:
return OSNAPError("Invalid request type")
def create_osnap_request(self, request_type: str, task_name: str = None, priority: int = 0, request_metadata: Dict = None) -> OSNAPRequest:
request = OSNAPRequest(requester_id=self.id, request_type=request_type, task_name=task_name, priority=priority, request_metadata=request_metadata)
signature = SignatureUtil.sign_data(self.private_key_pem, request.payload)
request.signature = signature
return request
def create_osnap_response(self, payload: Dict) -> OSNAPResponse:
response = OSNAPResponse(payload=payload)
signature = SignatureUtil.sign_data(self.private_key_pem, response.payload)
response.signature = signature
return response
def create_osnap_error(self, message: str) -> OSNAPError:
error = OSNAPError(message)
signature = SignatureUtil.sign_data(self.private_key_pem, error.payload)
error.signature = signature
return error
class OSNAPRequest:
def __init__(self, requester_id: str, request_type: str, task_name: str = None, priority: int = 0, request_metadata: Dict = None):
self.requester_id = requester_id
self.request_type = request_type
self.task_name = task_name
self.priority = priority
self.request_metadata = request_metadata or {}
self.timestamp = time.time()
self.payload = json.dumps({
"requester_id": self.requester_id,
"request_type": self.request_type,
"task_name": self.task_name,
"priority": self.priority,
"request_metadata": self.request_metadata,
"timestamp": self.timestamp
})
self.signature = None
class OSNAPResponse:
def __init__(self, payload: Dict):
self.payload = json.dumps(payload)
self.signature = None
class OSNAPError:
def __init__(self, message: str):
self.payload = json.dumps({"error": message})
self.signature = None